diff --git a/pkg/amqp-bunny/AmqpConsumer.php b/pkg/amqp-bunny/AmqpConsumer.php index 07b361d2e..0bfd0b94a 100644 --- a/pkg/amqp-bunny/AmqpConsumer.php +++ b/pkg/amqp-bunny/AmqpConsumer.php @@ -3,18 +3,20 @@ namespace Enqueue\AmqpBunny; use Bunny\Channel; -use Bunny\Client; use Bunny\Message; use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer; use Interop\Amqp\AmqpMessage as InteropAmqpMessage; use Interop\Amqp\AmqpQueue as InteropAmqpQueue; -use Interop\Amqp\Impl\AmqpMessage; -use Interop\Queue\Exception; use Interop\Queue\InvalidMessageException; use Interop\Queue\PsrMessage; class AmqpConsumer implements InteropAmqpConsumer { + /** + * @var AmqpContext + */ + private $context; + /** * @var Channel */ @@ -30,11 +32,6 @@ class AmqpConsumer implements InteropAmqpConsumer */ private $buffer; - /** - * @var bool - */ - private $isInit; - /** * @var string */ @@ -51,25 +48,19 @@ class AmqpConsumer implements InteropAmqpConsumer private $consumerTag; /** - * @var Message - */ - private $bunnyMessages = []; - - /** - * @param Channel $channel + * @param AmqpContext $context * @param InteropAmqpQueue $queue * @param Buffer $buffer * @param string $receiveMethod */ - public function __construct(Channel $channel, InteropAmqpQueue $queue, Buffer $buffer, $receiveMethod) + public function __construct(AmqpContext $context, InteropAmqpQueue $queue, Buffer $buffer, $receiveMethod) { - $this->channel = $channel; + $this->context = $context; + $this->channel = $context->getBunnyChannel(); $this->queue = $queue; $this->buffer = $buffer; $this->receiveMethod = $receiveMethod; $this->flags = self::FLAG_NOPARAM; - - $this->isInit = false; } /** @@ -77,10 +68,6 @@ public function __construct(Channel $channel, InteropAmqpQueue $queue, Buffer $b */ public function setConsumerTag($consumerTag) { - if ($this->isInit) { - throw new Exception('Consumer tag is not mutable after it has been subscribed to broker'); - } - $this->consumerTag = $consumerTag; } @@ -154,9 +141,7 @@ public function receive($timeout = 0) public function receiveNoWait() { if ($message = $this->channel->get($this->queue->getQueueName(), (bool) ($this->getFlags() & InteropAmqpConsumer::FLAG_NOACK))) { - $this->bunnyMessages[$message->deliveryTag] = $message; - - return $this->convertMessage($message); + return $this->context->convertMessage($message); } } @@ -167,11 +152,8 @@ public function acknowledge(PsrMessage $message) { InvalidMessageException::assertMessageInstanceOf($message, InteropAmqpMessage::class); - if (isset($this->bunnyMessages[$message->getDeliveryTag()])) { - $this->channel->ack($this->bunnyMessages[$message->getDeliveryTag()]); - - unset($this->bunnyMessages[$message->getDeliveryTag()]); - } + $bunnyMessage = new Message('', $message->getDeliveryTag(), '', '', '', [], ''); + $this->channel->ack($bunnyMessage); } /** @@ -182,41 +164,8 @@ public function reject(PsrMessage $message, $requeue = false) { InvalidMessageException::assertMessageInstanceOf($message, InteropAmqpMessage::class); - if (isset($this->bunnyMessages[$message->getDeliveryTag()])) { - $this->channel->reject($this->bunnyMessages[$message->getDeliveryTag()], $requeue); - - unset($this->bunnyMessages[$message->getDeliveryTag()]); - } - } - - /** - * @param Message $bunnyMessage - * - * @return InteropAmqpMessage - */ - private function convertMessage(Message $bunnyMessage) - { - $headers = $bunnyMessage->headers; - - $properties = []; - if (isset($headers['application_headers'])) { - $properties = $headers['application_headers']; - } - unset($headers['application_headers']); - - if (array_key_exists('timestamp', $headers)) { - /** @var \DateTime $date */ - $date = $headers['timestamp']; - - $headers['timestamp'] = (int) $date->format('U'); - } - - $message = new AmqpMessage($bunnyMessage->content, $properties, $headers); - $message->setDeliveryTag($bunnyMessage->deliveryTag); - $message->setRedelivered($bunnyMessage->redelivered); - $message->setRoutingKey($bunnyMessage->routingKey); - - return $message; + $bunnyMessage = new Message('', $message->getDeliveryTag(), '', '', '', [], ''); + $this->channel->reject($bunnyMessage, $requeue); } /** @@ -244,34 +193,12 @@ private function receiveBasicGet($timeout) */ private function receiveBasicConsume($timeout) { - if (false === $this->isInit) { - $callback = function (Message $message, Channel $channel, Client $bunny) { - $receivedMessage = $this->convertMessage($message); - $receivedMessage->setConsumerTag($message->consumerTag); - - $this->bunnyMessages[$message->deliveryTag] = $message; - $this->buffer->push($receivedMessage->getConsumerTag(), $receivedMessage); - - $bunny->stop(); - }; - - $frame = $this->channel->consume( - $callback, - $this->queue->getQueueName(), - $this->getConsumerTag() ?: $this->getQueue()->getConsumerTag(), - (bool) ($this->getFlags() & InteropAmqpConsumer::FLAG_NOLOCAL), - (bool) ($this->getFlags() & InteropAmqpConsumer::FLAG_NOACK), - (bool) ($this->getFlags() & InteropAmqpConsumer::FLAG_EXCLUSIVE), - (bool) ($this->getFlags() & InteropAmqpConsumer::FLAG_NOWAIT) - ); - - $this->consumerTag = $frame->consumerTag; - - if (empty($this->consumerTag)) { - throw new Exception('Got empty consumer tag'); - } + if (false == $this->consumerTag) { + $this->context->subscribe($this, function (InteropAmqpMessage $message) { + $this->buffer->push($message->getConsumerTag(), $message); - $this->isInit = true; + return false; + }); } if ($message = $this->buffer->pop($this->consumerTag)) { @@ -281,7 +208,7 @@ private function receiveBasicConsume($timeout) while (true) { $start = microtime(true); - $this->channel->getClient()->run($timeout / 1000); + $this->context->consume($timeout); if ($message = $this->buffer->pop($this->consumerTag)) { return $message; diff --git a/pkg/amqp-bunny/AmqpContext.php b/pkg/amqp-bunny/AmqpContext.php index 1b0edb723..e034d75fe 100644 --- a/pkg/amqp-bunny/AmqpContext.php +++ b/pkg/amqp-bunny/AmqpContext.php @@ -127,10 +127,10 @@ public function createConsumer(PsrDestination $destination) $queue = $this->createTemporaryQueue(); $this->bind(new AmqpBind($destination, $queue, $queue->getQueueName())); - return new AmqpConsumer($this->getBunnyChannel(), $queue, $this->buffer, $this->config['receive_method']); + return new AmqpConsumer($this, $queue, $this->buffer, $this->config['receive_method']); } - return new AmqpConsumer($this->getBunnyChannel(), $destination, $this->buffer, $this->config['receive_method']); + return new AmqpConsumer($this, $destination, $this->buffer, $this->config['receive_method']); } /** @@ -411,11 +411,13 @@ public function getBunnyChannel() } /** + * @internal It must be used here and in the consumer only + * * @param Message $bunnyMessage * * @return InteropAmqpMessage */ - private function convertMessage(Message $bunnyMessage) + public function convertMessage(Message $bunnyMessage) { $headers = $bunnyMessage->headers; @@ -425,7 +427,7 @@ private function convertMessage(Message $bunnyMessage) } unset($headers['application_headers']); - if (array_key_exists('timestamp', $headers)) { + if (array_key_exists('timestamp', $headers) && $headers['timestamp']) { /** @var \DateTime $date */ $date = $headers['timestamp']; diff --git a/pkg/amqp-bunny/AmqpProducer.php b/pkg/amqp-bunny/AmqpProducer.php index 753a2d69b..158ce6bf8 100644 --- a/pkg/amqp-bunny/AmqpProducer.php +++ b/pkg/amqp-bunny/AmqpProducer.php @@ -77,7 +77,7 @@ public function send(PsrDestination $destination, PsrMessage $message) $amqpProperties = $message->getHeaders(); - if (array_key_exists('timestamp', $amqpProperties)) { + if (array_key_exists('timestamp', $amqpProperties) && null !== $amqpProperties['timestamp']) { $amqpProperties['timestamp'] = \DateTime::createFromFormat('U', $amqpProperties['timestamp']); } diff --git a/pkg/amqp-bunny/Tests/AmqpConsumerTest.php b/pkg/amqp-bunny/Tests/AmqpConsumerTest.php index bc716174a..938dff5d2 100644 --- a/pkg/amqp-bunny/Tests/AmqpConsumerTest.php +++ b/pkg/amqp-bunny/Tests/AmqpConsumerTest.php @@ -5,8 +5,8 @@ use Bunny\Channel; use Bunny\Client; use Bunny\Message; -use Bunny\Protocol\MethodBasicConsumeOkFrame; use Enqueue\AmqpBunny\AmqpConsumer; +use Enqueue\AmqpBunny\AmqpContext; use Enqueue\AmqpBunny\Buffer; use Enqueue\Null\NullMessage; use Enqueue\Test\ClassExtensionTrait; @@ -30,7 +30,7 @@ public function testShouldImplementConsumerInterface() public function testCouldBeConstructedWithContextAndQueueAndBufferAsArguments() { new AmqpConsumer( - $this->createChannelMock(), + $this->createContextMock(), new AmqpQueue('aName'), new Buffer(), 'basic_get' @@ -41,14 +41,14 @@ public function testShouldReturnQueue() { $queue = new AmqpQueue('aName'); - $consumer = new AmqpConsumer($this->createChannelMock(), $queue, new Buffer(), 'basic_get'); + $consumer = new AmqpConsumer($this->createContextMock(), $queue, new Buffer(), 'basic_get'); $this->assertSame($queue, $consumer->getQueue()); } public function testOnAcknowledgeShouldThrowExceptionIfNotAmqpMessage() { - $consumer = new AmqpConsumer($this->createChannelMock(), new AmqpQueue('aName'), new Buffer(), 'basic_get'); + $consumer = new AmqpConsumer($this->createContextMock(), new AmqpQueue('aName'), new Buffer(), 'basic_get'); $this->expectException(InvalidMessageException::class); $this->expectExceptionMessage('The message must be an instance of Interop\Amqp\AmqpMessage but'); @@ -58,7 +58,7 @@ public function testOnAcknowledgeShouldThrowExceptionIfNotAmqpMessage() public function testOnRejectShouldThrowExceptionIfNotAmqpMessage() { - $consumer = new AmqpConsumer($this->createChannelMock(), new AmqpQueue('aName'), new Buffer(), 'basic_get'); + $consumer = new AmqpConsumer($this->createContextMock(), new AmqpQueue('aName'), new Buffer(), 'basic_get'); $this->expectException(InvalidMessageException::class); $this->expectExceptionMessage('The message must be an instance of Interop\Amqp\AmqpMessage but'); @@ -68,78 +68,78 @@ public function testOnRejectShouldThrowExceptionIfNotAmqpMessage() public function testOnAcknowledgeShouldAcknowledgeMessage() { - $bunnyMessage = new Message('', 'delivery-tag', true, '', '', [], 'body'); - - $channel = $this->createChannelMock(); - $channel - ->expects($this->once()) - ->method('get') - ->willReturn($bunnyMessage) - ; + $channel = $this->createBunnyChannelMock(); $channel ->expects($this->once()) ->method('ack') - ->with($this->identicalTo($bunnyMessage)) - ; + ->with($this->isInstanceOf(Message::class)) + ->willReturnCallback(function (Message $message) { + $this->assertSame('theDeliveryTag', $message->deliveryTag); + }); - $consumer = new AmqpConsumer($channel, new AmqpQueue('aName'), new Buffer(), 'basic_get'); + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('getBunnyChannel') + ->willReturn($channel) + ; - $message = $consumer->receiveNoWait(); + $consumer = new AmqpConsumer($context, new AmqpQueue('aName'), new Buffer(), 'basic_get'); - // guard - $this->assertSame('delivery-tag', $message->getDeliveryTag()); + $message = new AmqpMessage(); + $message->setDeliveryTag('theDeliveryTag'); $consumer->acknowledge($message); } public function testOnRejectShouldRejectMessage() { - $bunnyMessage = new Message('', 'delivery-tag', true, '', '', [], 'body'); - - $channel = $this->createChannelMock(); - $channel - ->expects($this->once()) - ->method('get') - ->willReturn($bunnyMessage) - ; + $channel = $this->createBunnyChannelMock(); $channel ->expects($this->once()) ->method('reject') - ->with($this->identicalTo($bunnyMessage), $this->isFalse()) - ; + ->with($this->isInstanceOf(Message::class), false) + ->willReturnCallback(function (Message $message) { + $this->assertSame('theDeliveryTag', $message->deliveryTag); + }); - $consumer = new AmqpConsumer($channel, new AmqpQueue('aName'), new Buffer(), 'basic_get'); + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('getBunnyChannel') + ->willReturn($channel) + ; - $message = $consumer->receiveNoWait(); + $consumer = new AmqpConsumer($context, new AmqpQueue('aName'), new Buffer(), 'basic_get'); - // guard - $this->assertSame('delivery-tag', $message->getDeliveryTag()); + $message = new AmqpMessage(); + $message->setDeliveryTag('theDeliveryTag'); $consumer->reject($message, false); } public function testOnRejectShouldRequeueMessage() { - $bunnyMessage = new Message('', 'delivery-tag', true, '', '', [], 'body'); - - $channel = $this->createChannelMock(); - $channel - ->expects($this->once()) - ->method('get') - ->willReturn($bunnyMessage) - ; + $channel = $this->createBunnyChannelMock(); $channel ->expects($this->once()) ->method('reject') - ->with($this->identicalTo($bunnyMessage), $this->isTrue()) - ; + ->with($this->isInstanceOf(Message::class), true) + ->willReturnCallback(function (Message $message) { + $this->assertSame('theDeliveryTag', $message->deliveryTag); + }); - $consumer = new AmqpConsumer($channel, new AmqpQueue('aName'), new Buffer(), 'basic_get'); + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('getBunnyChannel') + ->willReturn($channel) + ; - $message = $consumer->receiveNoWait(); + $consumer = new AmqpConsumer($context, new AmqpQueue('aName'), new Buffer(), 'basic_get'); - // guard - $this->assertSame('delivery-tag', $message->getDeliveryTag()); + $message = new AmqpMessage(); + $message->setDeliveryTag('theDeliveryTag'); $consumer->reject($message, true); } @@ -148,80 +148,66 @@ public function testShouldReturnMessageOnReceiveNoWait() { $bunnyMessage = new Message('', 'delivery-tag', true, '', '', [], 'body'); - $channel = $this->createChannelMock(); + $message = new AmqpMessage(); + + $channel = $this->createBunnyChannelMock(); $channel ->expects($this->once()) ->method('get') ->willReturn($bunnyMessage) ; - $consumer = new AmqpConsumer($channel, new AmqpQueue('aName'), new Buffer(), 'basic_get'); + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('getBunnyChannel') + ->willReturn($channel) + ; + $context + ->expects($this->once()) + ->method('convertMessage') + ->with($this->identicalTo($bunnyMessage)) + ->willReturn($message) + ; - $message = new AmqpMessage(); - $message->setDeliveryTag('delivery-tag'); + $consumer = new AmqpConsumer($context, new AmqpQueue('aName'), new Buffer(), 'basic_get'); - $message = $consumer->receiveNoWait(); + $receivedMessage = $consumer->receiveNoWait(); - $this->assertInstanceOf(AmqpMessage::class, $message); - $this->assertSame('body', $message->getBody()); - $this->assertSame('delivery-tag', $message->getDeliveryTag()); - $this->assertTrue($message->isRedelivered()); + $this->assertSame($message, $receivedMessage); } public function testShouldReturnMessageOnReceiveWithReceiveMethodBasicGet() { $bunnyMessage = new Message('', 'delivery-tag', true, '', '', [], 'body'); - $channel = $this->createChannelMock(); + $message = new AmqpMessage(); + + $channel = $this->createBunnyChannelMock(); $channel ->expects($this->once()) ->method('get') ->willReturn($bunnyMessage) ; - $consumer = new AmqpConsumer($channel, new AmqpQueue('aName'), new Buffer(), 'basic_get'); - - $message = new AmqpMessage(); - $message->setDeliveryTag('delivery-tag'); - - $message = $consumer->receive(); - - $this->assertInstanceOf(AmqpMessage::class, $message); - $this->assertSame('body', $message->getBody()); - $this->assertSame('delivery-tag', $message->getDeliveryTag()); - $this->assertTrue($message->isRedelivered()); - } - - public function testShouldCallExpectedMethodsWhenReceiveWithBasicConsumeMethod() - { - $frame = new MethodBasicConsumeOkFrame(); - $frame->consumerTag = 'theConsumerTag'; - - $client = $this->createClientMock(); - $client - ->expects($this->atLeastOnce()) - ->method('run') - ; - - $channel = $this->createChannelMock(); - $channel + $context = $this->createContextMock(); + $context ->expects($this->once()) - ->method('consume') - ->willReturn($frame) + ->method('getBunnyChannel') + ->willReturn($channel) ; - $channel - ->expects($this->atLeastOnce()) - ->method('getClient') - ->willReturn($client) + $context + ->expects($this->once()) + ->method('convertMessage') + ->with($this->identicalTo($bunnyMessage)) + ->willReturn($message) ; - $consumer = new AmqpConsumer($channel, new AmqpQueue('aName'), new Buffer(), 'basic_consume'); + $consumer = new AmqpConsumer($context, new AmqpQueue('aName'), new Buffer(), 'basic_get'); - $message = new AmqpMessage(); - $message->setDeliveryTag('delivery-tag'); - $consumer->receive(1234); + $receivedMessage = $consumer->receive(); - $this->assertSame('theConsumerTag', $consumer->getConsumerTag()); + $this->assertSame($message, $receivedMessage); } /** @@ -232,10 +218,18 @@ public function createClientMock() return $this->createMock(Client::class); } + /** + * @return \PHPUnit_Framework_MockObject_MockObject|AmqpContext + */ + public function createContextMock() + { + return $this->createMock(AmqpContext::class); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|Channel */ - public function createChannelMock() + public function createBunnyChannelMock() { return $this->createMock(Channel::class); } diff --git a/pkg/amqp-bunny/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php b/pkg/amqp-bunny/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php index 034eea13b..1ef23b65b 100644 --- a/pkg/amqp-bunny/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php +++ b/pkg/amqp-bunny/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php @@ -3,8 +3,8 @@ namespace Enqueue\AmqpBunny\Tests\Spec; use Enqueue\AmqpBunny\AmqpConnectionFactory; +use Enqueue\AmqpBunny\AmqpContext; use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy; -use Interop\Amqp\AmqpContext; use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec; @@ -30,9 +30,9 @@ protected function createContext() } /** - * {@inheritdoc} - * * @param AmqpContext $context + * + * {@inheritdoc} */ protected function createQueue(PsrContext $context, $queueName) { diff --git a/pkg/amqp-bunny/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php b/pkg/amqp-bunny/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php index 629eb3ec0..531389a02 100644 --- a/pkg/amqp-bunny/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php +++ b/pkg/amqp-bunny/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php @@ -3,6 +3,7 @@ namespace Enqueue\AmqpBunny\Tests\Spec; use Enqueue\AmqpBunny\AmqpConnectionFactory; +use Enqueue\AmqpBunny\AmqpContext; use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy; use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec; @@ -24,6 +25,8 @@ protected function createContext() } /** + * @param AmqpContext $context + * * {@inheritdoc} */ protected function createQueue(PsrContext $context, $queueName) @@ -31,6 +34,7 @@ protected function createQueue(PsrContext $context, $queueName) $queue = parent::createQueue($context, $queueName); $context->declareQueue($queue); + $context->purgeQueue($queue); return $queue; } diff --git a/pkg/amqp-bunny/composer.json b/pkg/amqp-bunny/composer.json index ca5eea964..9049d5b8a 100644 --- a/pkg/amqp-bunny/composer.json +++ b/pkg/amqp-bunny/composer.json @@ -16,7 +16,7 @@ "enqueue/test": "^0.8@dev", "enqueue/enqueue": "^0.8@dev", "enqueue/null": "^0.8@dev", - "queue-interop/queue-spec": "^0.5.2@dev", + "queue-interop/queue-spec": "^0.5.3@dev", "symfony/dependency-injection": "^2.8|^3", "symfony/config": "^2.8|^3" }, diff --git a/pkg/amqp-ext/AmqpConsumer.php b/pkg/amqp-ext/AmqpConsumer.php index 4a3591d37..eb47c0f44 100644 --- a/pkg/amqp-ext/AmqpConsumer.php +++ b/pkg/amqp-ext/AmqpConsumer.php @@ -6,7 +6,6 @@ use Interop\Amqp\AmqpMessage as InteropAmqpMessage; use Interop\Amqp\AmqpQueue; use Interop\Amqp\Impl\AmqpMessage; -use Interop\Queue\Exception; use Interop\Queue\InvalidMessageException; use Interop\Queue\PsrMessage; @@ -32,11 +31,6 @@ class AmqpConsumer implements InteropAmqpConsumer */ private $extQueue; - /** - * @var bool - */ - private $isInit; - /** * @var string */ @@ -65,8 +59,6 @@ public function __construct(AmqpContext $context, AmqpQueue $queue, Buffer $buff $this->buffer = $buffer; $this->receiveMethod = $receiveMethod; $this->flags = self::FLAG_NOPARAM; - - $this->isInit = false; } /** @@ -74,10 +66,6 @@ public function __construct(AmqpContext $context, AmqpQueue $queue, Buffer $buff */ public function setConsumerTag($consumerTag) { - if ($this->isInit) { - throw new Exception('Consumer tag is not mutable after it has been subscribed to broker'); - } - $this->consumerTag = $consumerTag; } @@ -157,7 +145,7 @@ public function receive($timeout = 0) public function receiveNoWait() { if ($extMessage = $this->getExtQueue()->get(Flags::convertConsumerFlags($this->flags))) { - return $this->convertMessage($extMessage); + return $this->context->convertMessage($extMessage); } } @@ -213,85 +201,44 @@ private function receiveBasicGet($timeout) */ private function receiveBasicConsume($timeout) { - if ($this->isInit && $message = $this->buffer->pop($this->getExtQueue()->getConsumerTag())) { - return $message; + if (false == $this->consumerTag) { + $this->context->subscribe($this, function (InteropAmqpMessage $message) { + $this->buffer->push($message->getConsumerTag(), $message); + + return false; + }); } - /** @var \AMQPQueue $extQueue */ - $extConnection = $this->getExtQueue()->getChannel()->getConnection(); + if ($message = $this->buffer->pop($this->consumerTag)) { + return $message; + } - $originalTimeout = $extConnection->getReadTimeout(); - try { - $extConnection->setReadTimeout($timeout / 1000); + while (true) { + $start = microtime(true); - if (false == $this->isInit) { - $this->getExtQueue()->consume(null, Flags::convertConsumerFlags($this->flags), $this->consumerTag); + $this->context->consume($timeout); - $this->isInit = true; + if ($message = $this->buffer->pop($this->consumerTag)) { + return $message; } - /** @var AmqpMessage|null $message */ - $message = null; - - $this->getExtQueue()->consume(function (\AMQPEnvelope $extEnvelope, \AMQPQueue $q) use (&$message) { - $message = $this->convertMessage($extEnvelope); - $message->setConsumerTag($q->getConsumerTag()); - - if ($this->getExtQueue()->getConsumerTag() == $q->getConsumerTag()) { - return false; - } + // is here when consumed message is not for this consumer - // not our message, put it to buffer and continue. - $this->buffer->push($q->getConsumerTag(), $message); - - $message = null; + // as timeout is infinite have to continue consumption, but it can overflow message buffer + if ($timeout <= 0) { + continue; + } - return true; - }, AMQP_JUST_CONSUME); + // compute remaining timeout and continue until time is up + $stop = microtime(true); + $timeout -= ($stop - $start) * 1000; - return $message; - } catch (\AMQPQueueException $e) { - if ('Consumer timeout exceed' == $e->getMessage()) { - return null; + if ($timeout <= 0) { + break; } - - throw $e; - } finally { - $extConnection->setReadTimeout($originalTimeout); } } - /** - * @param \AMQPEnvelope $extEnvelope - * - * @return AmqpMessage - */ - private function convertMessage(\AMQPEnvelope $extEnvelope) - { - $message = new AmqpMessage( - $extEnvelope->getBody(), - $extEnvelope->getHeaders(), - [ - 'message_id' => $extEnvelope->getMessageId(), - 'correlation_id' => $extEnvelope->getCorrelationId(), - 'app_id' => $extEnvelope->getAppId(), - 'type' => $extEnvelope->getType(), - 'content_encoding' => $extEnvelope->getContentEncoding(), - 'content_type' => $extEnvelope->getContentType(), - 'expiration' => $extEnvelope->getExpiration(), - 'priority' => $extEnvelope->getPriority(), - 'reply_to' => $extEnvelope->getReplyTo(), - 'timestamp' => $extEnvelope->getTimeStamp(), - 'user_id' => $extEnvelope->getUserId(), - ] - ); - $message->setRedelivered($extEnvelope->isRedelivery()); - $message->setDeliveryTag($extEnvelope->getDeliveryTag()); - $message->setRoutingKey($extEnvelope->getRoutingKey()); - - return $message; - } - /** * @return \AMQPQueue */ diff --git a/pkg/amqp-ext/AmqpContext.php b/pkg/amqp-ext/AmqpContext.php index d80442822..f7515d6ac 100644 --- a/pkg/amqp-ext/AmqpContext.php +++ b/pkg/amqp-ext/AmqpContext.php @@ -384,11 +384,13 @@ public function consume($timeout = 0) } /** + * @internal It must be used here and in the consumer only + * * @param \AMQPEnvelope $extEnvelope * * @return AmqpMessage */ - private function convertMessage(\AMQPEnvelope $extEnvelope) + public function convertMessage(\AMQPEnvelope $extEnvelope) { $message = new AmqpMessage( $extEnvelope->getBody(), diff --git a/pkg/amqp-ext/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php b/pkg/amqp-ext/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php index bb71e911c..b3916896b 100644 --- a/pkg/amqp-ext/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php +++ b/pkg/amqp-ext/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php @@ -3,6 +3,7 @@ namespace Enqueue\AmqpExt\Tests\Spec; use Enqueue\AmqpExt\AmqpConnectionFactory; +use Enqueue\AmqpExt\AmqpContext; use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy; use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec; @@ -24,6 +25,8 @@ protected function createContext() } /** + * @param AmqpContext $context + * * {@inheritdoc} */ protected function createQueue(PsrContext $context, $queueName) @@ -31,6 +34,7 @@ protected function createQueue(PsrContext $context, $queueName) $queue = parent::createQueue($context, $queueName); $context->declareQueue($queue); + $context->purgeQueue($queue); return $queue; } diff --git a/pkg/amqp-ext/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php b/pkg/amqp-ext/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php index 4ae6a1bae..251ea07db 100644 --- a/pkg/amqp-ext/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php +++ b/pkg/amqp-ext/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php @@ -3,6 +3,7 @@ namespace Enqueue\AmqpExt\Tests\Spec; use Enqueue\AmqpExt\AmqpConnectionFactory; +use Enqueue\AmqpExt\AmqpContext; use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy; use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec; @@ -24,6 +25,8 @@ protected function createContext() } /** + * @param AmqpContext $context + * * {@inheritdoc} */ protected function createQueue(PsrContext $context, $queueName) @@ -31,6 +34,7 @@ protected function createQueue(PsrContext $context, $queueName) $queue = parent::createQueue($context, $queueName); $context->declareQueue($queue); + $context->purgeQueue($queue); return $queue; } diff --git a/pkg/amqp-ext/composer.json b/pkg/amqp-ext/composer.json index 4821ddd1e..ae7306605 100644 --- a/pkg/amqp-ext/composer.json +++ b/pkg/amqp-ext/composer.json @@ -16,7 +16,7 @@ "enqueue/test": "^0.8@dev", "enqueue/enqueue": "^0.8@dev", "enqueue/null": "^0.8@dev", - "queue-interop/queue-spec": "^0.5.2@dev", + "queue-interop/queue-spec": "^0.5.3@dev", "empi89/php-amqp-stubs": "*@dev", "symfony/dependency-injection": "^2.8|^3", "symfony/config": "^2.8|^3" diff --git a/pkg/amqp-lib/AmqpConsumer.php b/pkg/amqp-lib/AmqpConsumer.php index f08289bd9..cd10f39df 100644 --- a/pkg/amqp-lib/AmqpConsumer.php +++ b/pkg/amqp-lib/AmqpConsumer.php @@ -5,17 +5,17 @@ use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer; use Interop\Amqp\AmqpMessage as InteropAmqpMessage; use Interop\Amqp\AmqpQueue as InteropAmqpQueue; -use Interop\Amqp\Impl\AmqpMessage; -use Interop\Queue\Exception; use Interop\Queue\InvalidMessageException; use Interop\Queue\PsrMessage; use PhpAmqpLib\Channel\AMQPChannel; -use PhpAmqpLib\Exception\AMQPTimeoutException; -use PhpAmqpLib\Message\AMQPMessage as LibAMQPMessage; -use PhpAmqpLib\Wire\AMQPTable; class AmqpConsumer implements InteropAmqpConsumer { + /** + * @var AmqpContext + */ + private $context; + /** * @var AMQPChannel */ @@ -31,11 +31,6 @@ class AmqpConsumer implements InteropAmqpConsumer */ private $buffer; - /** - * @var bool - */ - private $isInit; - /** * @var string */ @@ -52,20 +47,19 @@ class AmqpConsumer implements InteropAmqpConsumer private $consumerTag; /** - * @param AMQPChannel $channel + * @param AmqpContext $context * @param InteropAmqpQueue $queue * @param Buffer $buffer * @param string $receiveMethod */ - public function __construct(AMQPChannel $channel, InteropAmqpQueue $queue, Buffer $buffer, $receiveMethod) + public function __construct(AmqpContext $context, InteropAmqpQueue $queue, Buffer $buffer, $receiveMethod) { - $this->channel = $channel; + $this->context = $context; + $this->channel = $context->getLibChannel(); $this->queue = $queue; $this->buffer = $buffer; $this->receiveMethod = $receiveMethod; $this->flags = self::FLAG_NOPARAM; - - $this->isInit = false; } /** @@ -73,10 +67,6 @@ public function __construct(AMQPChannel $channel, InteropAmqpQueue $queue, Buffe */ public function setConsumerTag($consumerTag) { - if ($this->isInit) { - throw new Exception('Consumer tag is not mutable after it has been subscribed to broker'); - } - $this->consumerTag = $consumerTag; } @@ -152,7 +142,7 @@ public function receive($timeout = 0) public function receiveNoWait() { if ($message = $this->channel->basic_get($this->queue->getQueueName(), (bool) ($this->getFlags() & InteropAmqpConsumer::FLAG_NOACK))) { - return $this->convertMessage($message); + return $this->context->convertMessage($message); } } @@ -177,30 +167,6 @@ public function reject(PsrMessage $message, $requeue = false) $this->channel->basic_reject($message->getDeliveryTag(), $requeue); } - /** - * @param LibAMQPMessage $amqpMessage - * - * @return InteropAmqpMessage - */ - private function convertMessage(LibAMQPMessage $amqpMessage) - { - $headers = new AMQPTable($amqpMessage->get_properties()); - $headers = $headers->getNativeData(); - - $properties = []; - if (isset($headers['application_headers'])) { - $properties = $headers['application_headers']; - } - unset($headers['application_headers']); - - $message = new AmqpMessage($amqpMessage->getBody(), $properties, $headers); - $message->setDeliveryTag($amqpMessage->delivery_info['delivery_tag']); - $message->setRedelivered($amqpMessage->delivery_info['redelivered']); - $message->setRoutingKey($amqpMessage->delivery_info['routing_key']); - - return $message; - } - /** * @param int $timeout * @@ -226,63 +192,41 @@ private function receiveBasicGet($timeout) */ private function receiveBasicConsume($timeout) { - if (false === $this->isInit) { - $callback = function (LibAMQPMessage $message) { - $receivedMessage = $this->convertMessage($message); - $receivedMessage->setConsumerTag($message->delivery_info['consumer_tag']); - - $this->buffer->push($receivedMessage->getConsumerTag(), $receivedMessage); - }; - - $consumerTag = $this->channel->basic_consume( - $this->queue->getQueueName(), - $this->getConsumerTag() ?: $this->getQueue()->getConsumerTag(), - (bool) ($this->getFlags() & InteropAmqpConsumer::FLAG_NOLOCAL), - (bool) ($this->getFlags() & InteropAmqpConsumer::FLAG_NOACK), - (bool) ($this->getFlags() & InteropAmqpConsumer::FLAG_EXCLUSIVE), - (bool) ($this->getFlags() & InteropAmqpConsumer::FLAG_NOWAIT), - $callback - ); - - $this->consumerTag = $consumerTag ?: $this->getQueue()->getConsumerTag(); - - if (empty($this->consumerTag)) { - throw new Exception('Got empty consumer tag'); - } + if (false == $this->consumerTag) { + $this->context->subscribe($this, function (InteropAmqpMessage $message) { + $this->buffer->push($message->getConsumerTag(), $message); - $this->isInit = true; + return false; + }); } if ($message = $this->buffer->pop($this->consumerTag)) { return $message; } - try { - while (true) { - $start = microtime(true); + while (true) { + $start = microtime(true); - $this->channel->wait(null, false, $timeout / 1000); + $this->context->consume($timeout); - if ($message = $this->buffer->pop($this->consumerTag)) { - return $message; - } + if ($message = $this->buffer->pop($this->consumerTag)) { + return $message; + } - // is here when consumed message is not for this consumer + // is here when consumed message is not for this consumer - // as timeout is infinite have to continue consumption, but it can overflow message buffer - if ($timeout <= 0) { - continue; - } + // as timeout is infinite have to continue consumption, but it can overflow message buffer + if ($timeout <= 0) { + continue; + } - // compute remaining timeout and continue until time is up - $stop = microtime(true); - $timeout -= ($stop - $start) * 1000; + // compute remaining timeout and continue until time is up + $stop = microtime(true); + $timeout -= ($stop - $start) * 1000; - if ($timeout <= 0) { - break; - } + if ($timeout <= 0) { + break; } - } catch (AMQPTimeoutException $e) { } } } diff --git a/pkg/amqp-lib/AmqpContext.php b/pkg/amqp-lib/AmqpContext.php index 5c2226c41..d4616c60e 100644 --- a/pkg/amqp-lib/AmqpContext.php +++ b/pkg/amqp-lib/AmqpContext.php @@ -120,10 +120,10 @@ public function createConsumer(PsrDestination $destination) $queue = $this->createTemporaryQueue(); $this->bind(new AmqpBind($destination, $queue, $queue->getQueueName())); - return new AmqpConsumer($this->getChannel(), $queue, $this->buffer, $this->config['receive_method']); + return new AmqpConsumer($this, $queue, $this->buffer, $this->config['receive_method']); } - return new AmqpConsumer($this->getChannel(), $destination, $this->buffer, $this->config['receive_method']); + return new AmqpConsumer($this, $destination, $this->buffer, $this->config['receive_method']); } /** @@ -131,7 +131,7 @@ public function createConsumer(PsrDestination $destination) */ public function createProducer() { - $producer = new AmqpProducer($this->getChannel(), $this); + $producer = new AmqpProducer($this->getLibChannel(), $this); $producer->setDelayStrategy($this->delayStrategy); return $producer; @@ -142,7 +142,7 @@ public function createProducer() */ public function createTemporaryQueue() { - list($name) = $this->getChannel()->queue_declare('', false, false, true, false); + list($name) = $this->getLibChannel()->queue_declare('', false, false, true, false); $queue = $this->createQueue($name); $queue->addFlag(InteropAmqpQueue::FLAG_EXCLUSIVE); @@ -155,7 +155,7 @@ public function createTemporaryQueue() */ public function declareTopic(InteropAmqpTopic $topic) { - $this->getChannel()->exchange_declare( + $this->getLibChannel()->exchange_declare( $topic->getTopicName(), $topic->getType(), (bool) ($topic->getFlags() & InteropAmqpTopic::FLAG_PASSIVE), @@ -172,7 +172,7 @@ public function declareTopic(InteropAmqpTopic $topic) */ public function deleteTopic(InteropAmqpTopic $topic) { - $this->getChannel()->exchange_delete( + $this->getLibChannel()->exchange_delete( $topic->getTopicName(), (bool) ($topic->getFlags() & InteropAmqpTopic::FLAG_IFUNUSED), (bool) ($topic->getFlags() & InteropAmqpTopic::FLAG_NOWAIT) @@ -184,7 +184,7 @@ public function deleteTopic(InteropAmqpTopic $topic) */ public function declareQueue(InteropAmqpQueue $queue) { - list(, $messageCount) = $this->getChannel()->queue_declare( + list(, $messageCount) = $this->getLibChannel()->queue_declare( $queue->getQueueName(), (bool) ($queue->getFlags() & InteropAmqpQueue::FLAG_PASSIVE), (bool) ($queue->getFlags() & InteropAmqpQueue::FLAG_DURABLE), @@ -202,7 +202,7 @@ public function declareQueue(InteropAmqpQueue $queue) */ public function deleteQueue(InteropAmqpQueue $queue) { - $this->getChannel()->queue_delete( + $this->getLibChannel()->queue_delete( $queue->getQueueName(), (bool) ($queue->getFlags() & InteropAmqpQueue::FLAG_IFUNUSED), (bool) ($queue->getFlags() & InteropAmqpQueue::FLAG_IFEMPTY), @@ -215,7 +215,7 @@ public function deleteQueue(InteropAmqpQueue $queue) */ public function purgeQueue(InteropAmqpQueue $queue) { - $this->getChannel()->queue_purge( + $this->getLibChannel()->queue_purge( $queue->getQueueName(), (bool) ($queue->getFlags() & InteropAmqpQueue::FLAG_NOWAIT) ); @@ -232,7 +232,7 @@ public function bind(InteropAmqpBind $bind) // bind exchange to exchange if ($bind->getSource() instanceof InteropAmqpTopic && $bind->getTarget() instanceof InteropAmqpTopic) { - $this->getChannel()->exchange_bind( + $this->getLibChannel()->exchange_bind( $bind->getTarget()->getTopicName(), $bind->getSource()->getTopicName(), $bind->getRoutingKey(), @@ -241,7 +241,7 @@ public function bind(InteropAmqpBind $bind) ); // bind queue to exchange } elseif ($bind->getSource() instanceof InteropAmqpQueue) { - $this->getChannel()->queue_bind( + $this->getLibChannel()->queue_bind( $bind->getSource()->getQueueName(), $bind->getTarget()->getTopicName(), $bind->getRoutingKey(), @@ -250,7 +250,7 @@ public function bind(InteropAmqpBind $bind) ); // bind exchange to queue } else { - $this->getChannel()->queue_bind( + $this->getLibChannel()->queue_bind( $bind->getTarget()->getQueueName(), $bind->getSource()->getTopicName(), $bind->getRoutingKey(), @@ -271,7 +271,7 @@ public function unbind(InteropAmqpBind $bind) // bind exchange to exchange if ($bind->getSource() instanceof InteropAmqpTopic && $bind->getTarget() instanceof InteropAmqpTopic) { - $this->getChannel()->exchange_unbind( + $this->getLibChannel()->exchange_unbind( $bind->getTarget()->getTopicName(), $bind->getSource()->getTopicName(), $bind->getRoutingKey(), @@ -280,7 +280,7 @@ public function unbind(InteropAmqpBind $bind) ); // bind queue to exchange } elseif ($bind->getSource() instanceof InteropAmqpQueue) { - $this->getChannel()->queue_unbind( + $this->getLibChannel()->queue_unbind( $bind->getSource()->getQueueName(), $bind->getTarget()->getTopicName(), $bind->getRoutingKey(), @@ -288,7 +288,7 @@ public function unbind(InteropAmqpBind $bind) ); // bind exchange to queue } else { - $this->getChannel()->queue_unbind( + $this->getLibChannel()->queue_unbind( $bind->getTarget()->getQueueName(), $bind->getSource()->getTopicName(), $bind->getRoutingKey(), @@ -309,7 +309,7 @@ public function close() */ public function setQos($prefetchSize, $prefetchCount, $global) { - $this->getChannel()->basic_qos($prefetchSize, $prefetchCount, $global); + $this->getLibChannel()->basic_qos($prefetchSize, $prefetchCount, $global); } /** @@ -336,7 +336,7 @@ public function subscribe(InteropAmqpConsumer $consumer, callable $callback) } }; - $consumerTag = $this->getChannel()->basic_consume( + $consumerTag = $this->getLibChannel()->basic_consume( $consumer->getQueue()->getQueueName(), $consumer->getConsumerTag(), (bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOLOCAL), @@ -366,10 +366,10 @@ public function unsubscribe(InteropAmqpConsumer $consumer) $consumerTag = $consumer->getConsumerTag(); - $this->getChannel()->basic_cancel($consumerTag); + $this->getLibChannel()->basic_cancel($consumerTag); $consumer->setConsumerTag(null); - unset($this->subscribers[$consumerTag], $this->getChannel()->callbacks[$consumerTag]); + unset($this->subscribers[$consumerTag], $this->getLibChannel()->callbacks[$consumerTag]); } /** @@ -407,7 +407,7 @@ public function consume($timeout = 0) /** * @return AMQPChannel */ - private function getChannel() + public function getLibChannel() { if (null === $this->channel) { $this->channel = $this->connection->channel(); @@ -422,11 +422,13 @@ private function getChannel() } /** + * @internal It must be used here and in the consumer only + * * @param LibAMQPMessage $amqpMessage * * @return InteropAmqpMessage */ - private function convertMessage(LibAMQPMessage $amqpMessage) + public function convertMessage(LibAMQPMessage $amqpMessage) { $headers = new AMQPTable($amqpMessage->get_properties()); $headers = $headers->getNativeData(); diff --git a/pkg/amqp-lib/Tests/AmqpConsumerTest.php b/pkg/amqp-lib/Tests/AmqpConsumerTest.php index 46bb138e2..52d083ef1 100644 --- a/pkg/amqp-lib/Tests/AmqpConsumerTest.php +++ b/pkg/amqp-lib/Tests/AmqpConsumerTest.php @@ -3,6 +3,7 @@ namespace Enqueue\AmqpLib\Tests; use Enqueue\AmqpLib\AmqpConsumer; +use Enqueue\AmqpLib\AmqpContext; use Enqueue\AmqpLib\Buffer; use Enqueue\Null\NullMessage; use Enqueue\Test\ClassExtensionTrait; @@ -27,7 +28,7 @@ public function testShouldImplementConsumerInterface() public function testCouldBeConstructedWithContextAndQueueAndBufferAsArguments() { new AmqpConsumer( - $this->createChannelMock(), + $this->createContextMock(), new AmqpQueue('aName'), new Buffer(), 'basic_get' @@ -38,14 +39,14 @@ public function testShouldReturnQueue() { $queue = new AmqpQueue('aName'); - $consumer = new AmqpConsumer($this->createChannelMock(), $queue, new Buffer(), 'basic_get'); + $consumer = new AmqpConsumer($this->createContextMock(), $queue, new Buffer(), 'basic_get'); $this->assertSame($queue, $consumer->getQueue()); } public function testOnAcknowledgeShouldThrowExceptionIfNotAmqpMessage() { - $consumer = new AmqpConsumer($this->createChannelMock(), new AmqpQueue('aName'), new Buffer(), 'basic_get'); + $consumer = new AmqpConsumer($this->createContextMock(), new AmqpQueue('aName'), new Buffer(), 'basic_get'); $this->expectException(InvalidMessageException::class); $this->expectExceptionMessage('The message must be an instance of Interop\Amqp\AmqpMessage but'); @@ -55,7 +56,7 @@ public function testOnAcknowledgeShouldThrowExceptionIfNotAmqpMessage() public function testOnRejectShouldThrowExceptionIfNotAmqpMessage() { - $consumer = new AmqpConsumer($this->createChannelMock(), new AmqpQueue('aName'), new Buffer(), 'basic_get'); + $consumer = new AmqpConsumer($this->createContextMock(), new AmqpQueue('aName'), new Buffer(), 'basic_get'); $this->expectException(InvalidMessageException::class); $this->expectExceptionMessage('The message must be an instance of Interop\Amqp\AmqpMessage but'); @@ -65,14 +66,21 @@ public function testOnRejectShouldThrowExceptionIfNotAmqpMessage() public function testOnAcknowledgeShouldAcknowledgeMessage() { - $channel = $this->createChannelMock(); + $channel = $this->createLibChannelMock(); $channel ->expects($this->once()) ->method('basic_ack') ->with('delivery-tag') ; - $consumer = new AmqpConsumer($channel, new AmqpQueue('aName'), new Buffer(), 'basic_get'); + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('getLibChannel') + ->willReturn($channel) + ; + + $consumer = new AmqpConsumer($context, new AmqpQueue('aName'), new Buffer(), 'basic_get'); $message = new AmqpMessage(); $message->setDeliveryTag('delivery-tag'); @@ -82,14 +90,21 @@ public function testOnAcknowledgeShouldAcknowledgeMessage() public function testOnRejectShouldRejectMessage() { - $channel = $this->createChannelMock(); + $channel = $this->createLibChannelMock(); $channel ->expects($this->once()) ->method('basic_reject') ->with('delivery-tag', $this->isTrue()) ; - $consumer = new AmqpConsumer($channel, new AmqpQueue('aName'), new Buffer(), 'basic_get'); + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('getLibChannel') + ->willReturn($channel) + ; + + $consumer = new AmqpConsumer($context, new AmqpQueue('aName'), new Buffer(), 'basic_get'); $message = new AmqpMessage(); $message->setDeliveryTag('delivery-tag'); @@ -99,87 +114,89 @@ public function testOnRejectShouldRejectMessage() public function testShouldReturnMessageOnReceiveNoWait() { - $amqpMessage = new \PhpAmqpLib\Message\AMQPMessage('body'); - $amqpMessage->delivery_info['delivery_tag'] = 'delivery-tag'; - $amqpMessage->delivery_info['routing_key'] = 'routing-key'; - $amqpMessage->delivery_info['redelivered'] = true; - $amqpMessage->delivery_info['routing_key'] = 'routing-key'; + $libMessage = new \PhpAmqpLib\Message\AMQPMessage('body'); + $libMessage->delivery_info['delivery_tag'] = 'delivery-tag'; + $libMessage->delivery_info['routing_key'] = 'routing-key'; + $libMessage->delivery_info['redelivered'] = true; + $libMessage->delivery_info['routing_key'] = 'routing-key'; + + $message = new AmqpMessage(); - $channel = $this->createChannelMock(); + $channel = $this->createLibChannelMock(); $channel ->expects($this->once()) ->method('basic_get') - ->willReturn($amqpMessage) + ->willReturn($libMessage) ; - $consumer = new AmqpConsumer($channel, new AmqpQueue('aName'), new Buffer(), 'basic_get'); + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('getLibChannel') + ->willReturn($channel) + ; + $context + ->expects($this->once()) + ->method('convertMessage') + ->with($this->identicalTo($libMessage)) + ->willReturn($message) + ; - $message = new AmqpMessage(); - $message->setDeliveryTag('delivery-tag'); + $consumer = new AmqpConsumer($context, new AmqpQueue('aName'), new Buffer(), 'basic_get'); - $message = $consumer->receiveNoWait(); + $receivedMessage = $consumer->receiveNoWait(); - $this->assertInstanceOf(AmqpMessage::class, $message); - $this->assertSame('body', $message->getBody()); - $this->assertSame('delivery-tag', $message->getDeliveryTag()); - $this->assertSame('routing-key', $message->getRoutingKey()); - $this->assertTrue($message->isRedelivered()); + $this->assertSame($message, $receivedMessage); } public function testShouldReturnMessageOnReceiveWithReceiveMethodBasicGet() { - $amqpMessage = new \PhpAmqpLib\Message\AMQPMessage('body'); - $amqpMessage->delivery_info['delivery_tag'] = 'delivery-tag'; - $amqpMessage->delivery_info['routing_key'] = 'routing-key'; - $amqpMessage->delivery_info['redelivered'] = true; + $libMessage = new \PhpAmqpLib\Message\AMQPMessage('body'); + $libMessage->delivery_info['delivery_tag'] = 'delivery-tag'; + $libMessage->delivery_info['routing_key'] = 'routing-key'; + $libMessage->delivery_info['redelivered'] = true; + + $message = new AmqpMessage(); - $channel = $this->createChannelMock(); + $channel = $this->createLibChannelMock(); $channel ->expects($this->once()) ->method('basic_get') - ->willReturn($amqpMessage) + ->willReturn($libMessage) ; - $consumer = new AmqpConsumer($channel, new AmqpQueue('aName'), new Buffer(), 'basic_get'); + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('getLibChannel') + ->willReturn($channel) + ; + $context + ->expects($this->once()) + ->method('convertMessage') + ->with($this->identicalTo($libMessage)) + ->willReturn($message) + ; - $message = new AmqpMessage(); - $message->setDeliveryTag('delivery-tag'); + $consumer = new AmqpConsumer($context, new AmqpQueue('aName'), new Buffer(), 'basic_get'); - $message = $consumer->receive(); + $receivedMessage = $consumer->receive(); - $this->assertInstanceOf(AmqpMessage::class, $message); - $this->assertSame('body', $message->getBody()); - $this->assertSame('delivery-tag', $message->getDeliveryTag()); - $this->assertSame('routing-key', $message->getRoutingKey()); - $this->assertTrue($message->isRedelivered()); + $this->assertSame($message, $receivedMessage); } - public function testShouldCallExpectedMethodsWhenReceiveWithBasicConsumeMethod() + /** + * @return \PHPUnit_Framework_MockObject_MockObject|AmqpContext + */ + public function createContextMock() { - $channel = $this->createChannelMock(); - $channel - ->expects($this->once()) - ->method('basic_consume') - ->willReturn('consumer-tag') - ; - $channel - ->expects($this->once()) - ->method('wait') - ->willReturnCallback(function () { - usleep(2000); - }); - - $consumer = new AmqpConsumer($channel, new AmqpQueue('aName'), new Buffer(), 'basic_consume'); - - $message = new AmqpMessage(); - $message->setDeliveryTag('delivery-tag'); - $consumer->receive(1); + return $this->createMock(AmqpContext::class); } /** * @return \PHPUnit_Framework_MockObject_MockObject|AMQPChannel */ - public function createChannelMock() + public function createLibChannelMock() { return $this->createMock(AMQPChannel::class); } diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php index 12f93a4f1..42bc523ce 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php @@ -3,6 +3,7 @@ namespace Enqueue\AmqpLib\Tests\Spec; use Enqueue\AmqpLib\AmqpConnectionFactory; +use Enqueue\AmqpLib\AmqpContext; use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy; use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec; @@ -24,6 +25,8 @@ protected function createContext() } /** + * @param AmqpContext $context + * * {@inheritdoc} */ protected function createQueue(PsrContext $context, $queueName) @@ -31,6 +34,7 @@ protected function createQueue(PsrContext $context, $queueName) $queue = parent::createQueue($context, $queueName); $context->declareQueue($queue); + $context->purgeQueue($queue); return $queue; } diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php index ca2284443..e27166695 100644 --- a/pkg/amqp-lib/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php @@ -3,6 +3,7 @@ namespace Enqueue\AmqpLib\Tests\Spec; use Enqueue\AmqpLib\AmqpConnectionFactory; +use Enqueue\AmqpLib\AmqpContext; use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy; use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec; @@ -24,6 +25,8 @@ protected function createContext() } /** + * @param AmqpContext $context + * * {@inheritdoc} */ protected function createQueue(PsrContext $context, $queueName) @@ -31,6 +34,7 @@ protected function createQueue(PsrContext $context, $queueName) $queue = parent::createQueue($context, $queueName); $context->declareQueue($queue); + $context->purgeQueue($queue); return $queue; } diff --git a/pkg/amqp-lib/composer.json b/pkg/amqp-lib/composer.json index 4c4702e06..7d3f2b959 100644 --- a/pkg/amqp-lib/composer.json +++ b/pkg/amqp-lib/composer.json @@ -16,7 +16,7 @@ "enqueue/test": "^0.8@dev", "enqueue/enqueue": "^0.8@dev", "enqueue/null": "^0.8@dev", - "queue-interop/queue-spec": "^0.5.2@dev", + "queue-interop/queue-spec": "^0.5.3@dev", "symfony/dependency-injection": "^2.8|^3", "symfony/config": "^2.8|^3" }, diff --git a/pkg/dbal/composer.json b/pkg/dbal/composer.json index a5857d800..94c931356 100644 --- a/pkg/dbal/composer.json +++ b/pkg/dbal/composer.json @@ -14,7 +14,7 @@ "enqueue/test": "^0.8@dev", "enqueue/enqueue": "^0.8@dev", "enqueue/null": "^0.8@dev", - "queue-interop/queue-spec": "^0.5@dev", + "queue-interop/queue-spec": "^0.5.3@dev", "symfony/dependency-injection": "^2.8|^3", "symfony/config": "^2.8|^3" }, diff --git a/pkg/fs/composer.json b/pkg/fs/composer.json index b9a94b7f1..a76cbaca8 100644 --- a/pkg/fs/composer.json +++ b/pkg/fs/composer.json @@ -15,7 +15,7 @@ "enqueue/enqueue": "^0.7", "enqueue/null": "^0.7", "enqueue/test": "^0.7", - "queue-interop/queue-spec": "^0.5", + "queue-interop/queue-spec": "^0.5.3@dev", "symfony/dependency-injection": "^2.8|^3@stable", "symfony/config": "^2.8|^3@stable", "symfony/phpunit-bridge": "^2.8|^3@stable" diff --git a/pkg/gearman/composer.json b/pkg/gearman/composer.json index b3eac2020..4529a1197 100644 --- a/pkg/gearman/composer.json +++ b/pkg/gearman/composer.json @@ -14,7 +14,7 @@ "enqueue/test": "^0.8@dev", "enqueue/enqueue": "^0.8@dev", "enqueue/null": "^0.8@dev", - "queue-interop/queue-spec": "^0.5@dev", + "queue-interop/queue-spec": "^0.5.3@dev", "symfony/dependency-injection": "^2.8|^3", "symfony/config": "^2.8|^3" }, diff --git a/pkg/gps/composer.json b/pkg/gps/composer.json index 6f70d8f90..b9540a51c 100644 --- a/pkg/gps/composer.json +++ b/pkg/gps/composer.json @@ -13,7 +13,7 @@ "phpunit/phpunit": "~5.4.0", "enqueue/test": "^0.8@dev", "enqueue/enqueue": "^0.8@dev", - "queue-interop/queue-spec": "^0.5@dev", + "queue-interop/queue-spec": "^0.5.3@dev", "symfony/dependency-injection": "^2.8|^3", "symfony/config": "^2.8|^3" }, diff --git a/pkg/null/composer.json b/pkg/null/composer.json index f74a00f8d..e24c54c2f 100644 --- a/pkg/null/composer.json +++ b/pkg/null/composer.json @@ -12,7 +12,7 @@ "phpunit/phpunit": "~5.5", "enqueue/enqueue": "^0.8@dev", "enqueue/test": "^0.8@dev", - "queue-interop/queue-spec": "^0.5@dev", + "queue-interop/queue-spec": "^0.5.3@dev", "symfony/dependency-injection": "^2.8|^3", "symfony/config": "^2.8|^3" }, diff --git a/pkg/pheanstalk/composer.json b/pkg/pheanstalk/composer.json index 4518f3470..542e18ae5 100644 --- a/pkg/pheanstalk/composer.json +++ b/pkg/pheanstalk/composer.json @@ -14,7 +14,7 @@ "enqueue/test": "^0.8@dev", "enqueue/enqueue": "^0.8@dev", "enqueue/null": "^0.8@dev", - "queue-interop/queue-spec": "^0.5@dev", + "queue-interop/queue-spec": "^0.5.3@dev", "symfony/dependency-injection": "^2.8|^3", "symfony/config": "^2.8|^3" }, diff --git a/pkg/rdkafka/composer.json b/pkg/rdkafka/composer.json index b02f3cc4c..1014002d1 100644 --- a/pkg/rdkafka/composer.json +++ b/pkg/rdkafka/composer.json @@ -14,7 +14,7 @@ "enqueue/test": "^0.8@dev", "enqueue/enqueue": "^0.8@dev", "enqueue/null": "^0.8@dev", - "queue-interop/queue-spec": "^0.5@dev", + "queue-interop/queue-spec": "^0.5.3@dev", "kwn/php-rdkafka-stubs": "^1.0.2" }, "autoload": { diff --git a/pkg/redis/composer.json b/pkg/redis/composer.json index db13a09c2..cb924e286 100644 --- a/pkg/redis/composer.json +++ b/pkg/redis/composer.json @@ -14,7 +14,7 @@ "enqueue/test": "^0.8@dev", "enqueue/enqueue": "^0.8@dev", "enqueue/null": "^0.8@dev", - "queue-interop/queue-spec": "^0.5@dev", + "queue-interop/queue-spec": "^0.5.3@dev", "symfony/dependency-injection": "^2.8|^3", "symfony/config": "^2.8|^3" }, diff --git a/pkg/sqs/composer.json b/pkg/sqs/composer.json index 7a5e181e5..c8e3a51c9 100644 --- a/pkg/sqs/composer.json +++ b/pkg/sqs/composer.json @@ -13,7 +13,7 @@ "phpunit/phpunit": "~5.4.0", "enqueue/test": "^0.8@dev", "enqueue/enqueue": "^0.8@dev", - "queue-interop/queue-spec": "^0.5@dev", + "queue-interop/queue-spec": "^0.5.3@dev", "symfony/dependency-injection": "^2.8|^3", "symfony/config": "^2.8|^3" }, diff --git a/pkg/stomp/composer.json b/pkg/stomp/composer.json index a41fe0333..703cc9fe0 100644 --- a/pkg/stomp/composer.json +++ b/pkg/stomp/composer.json @@ -16,7 +16,7 @@ "enqueue/test": "^0.8@dev", "enqueue/enqueue": "^0.8@dev", "enqueue/null": "^0.8@dev", - "queue-interop/queue-spec": "^0.5@dev", + "queue-interop/queue-spec": "^0.5.3@dev", "symfony/dependency-injection": "^2.8|^3", "symfony/config": "^2.8|^3" },