From 94f5e94e457f4768b57de1c8139f4f36a13e06ff Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 6 Aug 2018 14:20:14 +0300 Subject: [PATCH 1/4] add subscription consumer to amqp pkgs. --- pkg/amqp-bunny/AmqpContext.php | 18 +++- pkg/amqp-bunny/composer.json | 4 +- pkg/amqp-ext/AmqpContext.php | 18 +++- pkg/amqp-ext/composer.json | 4 +- pkg/amqp-lib/AmqpContext.php | 18 +++- pkg/amqp-lib/composer.json | 5 +- pkg/amqp-tools/SubscriptionConsumer.php | 54 +++++++++++ .../Tests/SubscriptionConsumerTest.php | 96 +++++++++++++++++++ pkg/amqp-tools/composer.json | 2 +- 9 files changed, 208 insertions(+), 11 deletions(-) create mode 100644 pkg/amqp-tools/SubscriptionConsumer.php create mode 100644 pkg/amqp-tools/Tests/SubscriptionConsumerTest.php diff --git a/pkg/amqp-bunny/AmqpContext.php b/pkg/amqp-bunny/AmqpContext.php index bc2aaf9b9..d0730d6b5 100644 --- a/pkg/amqp-bunny/AmqpContext.php +++ b/pkg/amqp-bunny/AmqpContext.php @@ -9,6 +9,7 @@ use Enqueue\AmqpTools\DelayStrategyAware; use Enqueue\AmqpTools\DelayStrategyAwareTrait; use Enqueue\AmqpTools\SignalSocketHelper; +use Enqueue\AmqpTools\SubscriptionConsumer; use Interop\Amqp\AmqpBind as InteropAmqpBind; use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer; use Interop\Amqp\AmqpContext as InteropAmqpContext; @@ -22,9 +23,10 @@ use Interop\Queue\Exception; use Interop\Queue\InvalidDestinationException; use Interop\Queue\PsrDestination; +use Interop\Queue\PsrSubscriptionConsumerAwareContext; use Interop\Queue\PsrTopic; -class AmqpContext implements InteropAmqpContext, DelayStrategyAware +class AmqpContext implements InteropAmqpContext, DelayStrategyAware, PsrSubscriptionConsumerAwareContext { use DelayStrategyAwareTrait; @@ -136,6 +138,14 @@ public function createConsumer(PsrDestination $destination) return new AmqpConsumer($this, $destination, $this->buffer, $this->config['receive_method']); } + /** + * {@inheritdoc} + */ + public function createSubscriptionConsumer() + { + return new SubscriptionConsumer($this); + } + /** * @return AmqpProducer */ @@ -323,6 +333,8 @@ public function setQos($prefetchSize, $prefetchCount, $global) } /** + * @deprecated since 0.8.34 will be removed in 0.9 + * * {@inheritdoc} */ public function subscribe(InteropAmqpConsumer $consumer, callable $callback) @@ -366,6 +378,8 @@ public function subscribe(InteropAmqpConsumer $consumer, callable $callback) } /** + * @deprecated since 0.8.34 will be removed in 0.9 + * * {@inheritdoc} */ public function unsubscribe(InteropAmqpConsumer $consumer) @@ -382,6 +396,8 @@ public function unsubscribe(InteropAmqpConsumer $consumer) } /** + * @deprecated since 0.8.34 will be removed in 0.9 + * * {@inheritdoc} */ public function consume($timeout = 0) diff --git a/pkg/amqp-bunny/composer.json b/pkg/amqp-bunny/composer.json index dbb76cfa3..15c917900 100644 --- a/pkg/amqp-bunny/composer.json +++ b/pkg/amqp-bunny/composer.json @@ -8,9 +8,9 @@ "require": { "php": ">=5.6", - "queue-interop/amqp-interop": "^0.7@dev", + "queue-interop/amqp-interop": "^0.7.4@dev", "bunny/bunny": "^0.2.4|^0.3|^0.4", - "enqueue/amqp-tools": "^0.8.4@dev" + "enqueue/amqp-tools": "^0.8.24@dev" }, "require-dev": { "phpunit/phpunit": "~5.4.0", diff --git a/pkg/amqp-ext/AmqpContext.php b/pkg/amqp-ext/AmqpContext.php index c07ef4485..256f0b19a 100644 --- a/pkg/amqp-ext/AmqpContext.php +++ b/pkg/amqp-ext/AmqpContext.php @@ -4,6 +4,7 @@ use Enqueue\AmqpTools\DelayStrategyAware; use Enqueue\AmqpTools\DelayStrategyAwareTrait; +use Enqueue\AmqpTools\SubscriptionConsumer; use Interop\Amqp\AmqpBind as InteropAmqpBind; use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer; use Interop\Amqp\AmqpContext as InteropAmqpContext; @@ -16,9 +17,10 @@ use Interop\Queue\Exception; use Interop\Queue\InvalidDestinationException; use Interop\Queue\PsrDestination; +use Interop\Queue\PsrSubscriptionConsumerAwareContext; use Interop\Queue\PsrTopic; -class AmqpContext implements InteropAmqpContext, DelayStrategyAware +class AmqpContext implements InteropAmqpContext, DelayStrategyAware, PsrSubscriptionConsumerAwareContext { use DelayStrategyAwareTrait; @@ -260,6 +262,14 @@ public function createConsumer(PsrDestination $destination) return new AmqpConsumer($this, $destination, $this->buffer, $this->receiveMethod); } + /** + * {@inheritdoc} + */ + public function createSubscriptionConsumer() + { + return new SubscriptionConsumer($this); + } + /** * {@inheritdoc} */ @@ -300,6 +310,8 @@ public function getExtChannel() } /** + * @deprecated since 0.8.34 will be removed in 0.9 + * * {@inheritdoc} */ public function subscribe(InteropAmqpConsumer $consumer, callable $callback) @@ -319,6 +331,8 @@ public function subscribe(InteropAmqpConsumer $consumer, callable $callback) } /** + * @deprecated since 0.8.34 will be removed in 0.9 + * * {@inheritdoc} */ public function unsubscribe(InteropAmqpConsumer $consumer) @@ -337,6 +351,8 @@ public function unsubscribe(InteropAmqpConsumer $consumer) } /** + * @deprecated since 0.8.34 will be removed in 0.9 + * * {@inheritdoc} */ public function consume($timeout = 0) diff --git a/pkg/amqp-ext/composer.json b/pkg/amqp-ext/composer.json index 1b80a9cec..0cf7847cf 100644 --- a/pkg/amqp-ext/composer.json +++ b/pkg/amqp-ext/composer.json @@ -9,8 +9,8 @@ "php": ">=5.6", "ext-amqp": "^1.9.3", - "queue-interop/amqp-interop": "^0.7@dev", - "enqueue/amqp-tools": "^0.8.4@dev" + "queue-interop/amqp-interop": "^0.7.4@dev", + "enqueue/amqp-tools": "^0.8.24@dev" }, "require-dev": { "phpunit/phpunit": "~5.4.0", diff --git a/pkg/amqp-lib/AmqpContext.php b/pkg/amqp-lib/AmqpContext.php index cf78d18af..02c9bcb85 100644 --- a/pkg/amqp-lib/AmqpContext.php +++ b/pkg/amqp-lib/AmqpContext.php @@ -5,6 +5,7 @@ use Enqueue\AmqpTools\DelayStrategyAware; use Enqueue\AmqpTools\DelayStrategyAwareTrait; use Enqueue\AmqpTools\SignalSocketHelper; +use Enqueue\AmqpTools\SubscriptionConsumer; use Interop\Amqp\AmqpBind as InteropAmqpBind; use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer; use Interop\Amqp\AmqpContext as InteropAmqpContext; @@ -18,6 +19,7 @@ use Interop\Queue\Exception; use Interop\Queue\InvalidDestinationException; use Interop\Queue\PsrDestination; +use Interop\Queue\PsrSubscriptionConsumerAwareContext; use Interop\Queue\PsrTopic; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AbstractConnection; @@ -26,7 +28,7 @@ use PhpAmqpLib\Message\AMQPMessage as LibAMQPMessage; use PhpAmqpLib\Wire\AMQPTable; -class AmqpContext implements InteropAmqpContext, DelayStrategyAware +class AmqpContext implements InteropAmqpContext, DelayStrategyAware, PsrSubscriptionConsumerAwareContext { use DelayStrategyAwareTrait; @@ -129,6 +131,14 @@ public function createConsumer(PsrDestination $destination) return new AmqpConsumer($this, $destination, $this->buffer, $this->config['receive_method']); } + /** + * {@inheritdoc} + */ + public function createSubscriptionConsumer() + { + return new SubscriptionConsumer($this); + } + /** * @return AmqpProducer */ @@ -316,6 +326,8 @@ public function setQos($prefetchSize, $prefetchCount, $global) } /** + * @deprecated since 0.8.34 will be removed in 0.9 + * * {@inheritdoc} */ public function subscribe(InteropAmqpConsumer $consumer, callable $callback) @@ -359,6 +371,8 @@ public function subscribe(InteropAmqpConsumer $consumer, callable $callback) } /** + * @deprecated since 0.8.34 will be removed in 0.9 + * * {@inheritdoc} */ public function unsubscribe(InteropAmqpConsumer $consumer) @@ -376,6 +390,8 @@ public function unsubscribe(InteropAmqpConsumer $consumer) } /** + * @deprecated since 0.8.34 will be removed in 0.9 + * * {@inheritdoc} */ public function consume($timeout = 0) diff --git a/pkg/amqp-lib/composer.json b/pkg/amqp-lib/composer.json index 0a876f11e..dd6df3de4 100644 --- a/pkg/amqp-lib/composer.json +++ b/pkg/amqp-lib/composer.json @@ -8,9 +8,8 @@ "require": { "php": ">=5.6", "php-amqplib/php-amqplib": "^2.7@dev", - "queue-interop/queue-interop": "^0.6@dev|^1.0.0-alpha1", - "queue-interop/amqp-interop": "^0.7@dev", - "enqueue/amqp-tools": "^0.8.5@dev" + "queue-interop/amqp-interop": "^0.7.4@dev", + "enqueue/amqp-tools": "^0.8.24@dev" }, "require-dev": { "phpunit/phpunit": "~5.4.0", diff --git a/pkg/amqp-tools/SubscriptionConsumer.php b/pkg/amqp-tools/SubscriptionConsumer.php new file mode 100644 index 000000000..a207d7b63 --- /dev/null +++ b/pkg/amqp-tools/SubscriptionConsumer.php @@ -0,0 +1,54 @@ +context = $context; + } + + /** + * {@inheritdoc} + */ + public function consume($timeout = 0) + { + $this->context->consume($timeout); + } + + /** + * {@inheritdoc} + */ + public function subscribe(PsrConsumer $consumer, callable $callback) + { + $this->context->subscribe($consumer, $callback); + } + + /** + * {@inheritdoc} + */ + public function unsubscribe(PsrConsumer $consumer) + { + $this->context->unsubscribe($consumer); + } + + /** + * TODO. + * + * {@inheritdoc} + */ + public function unsubscribeAll() + { + throw new \LogicException('Not implemented'); + } +} diff --git a/pkg/amqp-tools/Tests/SubscriptionConsumerTest.php b/pkg/amqp-tools/Tests/SubscriptionConsumerTest.php new file mode 100644 index 000000000..84b380070 --- /dev/null +++ b/pkg/amqp-tools/Tests/SubscriptionConsumerTest.php @@ -0,0 +1,96 @@ +assertTrue($rc->implementsInterface(PsrSubscriptionConsumer::class)); + } + + public function testCouldBeConstructedWithAmqpContextAsFirstArgument() + { + new SubscriptionConsumer($this->createContext()); + } + + public function testShouldProxySubscribeCallToContextMethod() + { + $consumer = $this->createConsumer(); + $callback = function () {}; + + $context = $this->createContext(); + $context + ->expects($this->once()) + ->method('subscribe') + ->with($this->identicalTo($consumer), $this->identicalTo($callback)) + ; + + $subscriptionConsumer = new SubscriptionConsumer($context); + $subscriptionConsumer->subscribe($consumer, $callback); + } + + public function testShouldProxyUnsubscribeCallToContextMethod() + { + $consumer = $this->createConsumer(); + + $context = $this->createContext(); + $context + ->expects($this->once()) + ->method('unsubscribe') + ->with($this->identicalTo($consumer)) + ; + + $subscriptionConsumer = new SubscriptionConsumer($context); + $subscriptionConsumer->unsubscribe($consumer); + } + + public function testShouldProxyConsumeCallToContextMethod() + { + $timeout = 123.456; + + $context = $this->createContext(); + $context + ->expects($this->once()) + ->method('consume') + ->with($this->identicalTo($timeout)) + ; + + $subscriptionConsumer = new SubscriptionConsumer($context); + $subscriptionConsumer->consume($timeout); + } + + public function testThrowsNotImplementedOnUnsubscribeAllCall() + { + $context = $this->createContext(); + + $subscriptionConsumer = new SubscriptionConsumer($context); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Not implemented'); + $subscriptionConsumer->unsubscribeAll(); + } + + /** + * @return AmqpConsumer|\PHPUnit_Framework_MockObject_MockObject + */ + private function createConsumer() + { + return $this->createMock(AmqpConsumer::class); + } + + /** + * @return AmqpContext|\PHPUnit_Framework_MockObject_MockObject + */ + private function createContext() + { + return $this->createMock(AmqpContext::class); + } +} diff --git a/pkg/amqp-tools/composer.json b/pkg/amqp-tools/composer.json index 4d03bda67..eecd90d6e 100644 --- a/pkg/amqp-tools/composer.json +++ b/pkg/amqp-tools/composer.json @@ -8,7 +8,7 @@ "require": { "php": ">=5.6", "queue-interop/queue-interop": "^0.6@dev|^1.0.0-alpha1", - "queue-interop/amqp-interop": "^0.7@dev" + "queue-interop/amqp-interop": "^0.7.4@dev" }, "require-dev": { "phpunit/phpunit": "~5.4.0", From 2697695c1c71465f9f4575ef1c8af271015bbd2d Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 6 Aug 2018 16:20:20 +0300 Subject: [PATCH 2/4] add fallback subscription consumer, add subscription consumer support to queue consumer. --- .../FallbackSubscriptionConsumer.php | 124 ++++++++ pkg/enqueue/Consumption/QueueConsumer.php | 79 +++++- .../FallbackSubscriptionConsumerTest.php | 267 ++++++++++++++++++ 3 files changed, 467 insertions(+), 3 deletions(-) create mode 100644 pkg/enqueue/Consumption/FallbackSubscriptionConsumer.php create mode 100644 pkg/enqueue/Tests/Consumption/FallbackSubscriptionConsumerTest.php diff --git a/pkg/enqueue/Consumption/FallbackSubscriptionConsumer.php b/pkg/enqueue/Consumption/FallbackSubscriptionConsumer.php new file mode 100644 index 000000000..f4176a696 --- /dev/null +++ b/pkg/enqueue/Consumption/FallbackSubscriptionConsumer.php @@ -0,0 +1,124 @@ +subscribers = []; + } + + /** + * {@inheritdoc} + */ + public function consume($timeout = 0) + { + if (empty($this->subscribers)) { + throw new \LogicException('No subscribers'); + } + + $timeout /= 1000; + $endAt = microtime(true) + $timeout; + + while (true) { + /** + * @var string + * @var PsrConsumer $consumer + * @var callable $processor + */ + foreach ($this->subscribers as $queueName => list($consumer, $callback)) { + $message = $consumer->receiveNoWait(); + + if ($message) { + if (false === call_user_func($callback, $message, $consumer)) { + return; + } + } else { + if ($timeout && microtime(true) >= $endAt) { + return; + } + + $this->idleTime && usleep($this->idleTime); + } + + if ($timeout && microtime(true) >= $endAt) { + return; + } + } + } + } + + /** + * {@inheritdoc} + */ + public function subscribe(PsrConsumer $consumer, callable $callback) + { + $queueName = $consumer->getQueue()->getQueueName(); + if (array_key_exists($queueName, $this->subscribers)) { + if ($this->subscribers[$queueName][0] === $consumer && $this->subscribers[$queueName][1] === $callback) { + return; + } + + throw new \InvalidArgumentException(sprintf('There is a consumer subscribed to queue: "%s"', $queueName)); + } + + $this->subscribers[$queueName] = [$consumer, $callback]; + } + + /** + * {@inheritdoc} + */ + public function unsubscribe(PsrConsumer $consumer) + { + if (false == array_key_exists($consumer->getQueue()->getQueueName(), $this->subscribers)) { + return; + } + + if ($this->subscribers[$consumer->getQueue()->getQueueName()][0] !== $consumer) { + return; + } + + unset($this->subscribers[$consumer->getQueue()->getQueueName()]); + } + + /** + * {@inheritdoc} + */ + public function unsubscribeAll() + { + $this->subscribers = []; + } + + /** + * @return float|int + */ + public function getIdleTime() + { + return $this->idleTime; + } + + /** + * @param float|int $idleTime + */ + public function setIdleTime($idleTime) + { + $this->idleTime = $idleTime; + } +} diff --git a/pkg/enqueue/Consumption/QueueConsumer.php b/pkg/enqueue/Consumption/QueueConsumer.php index 7f484044e..c41028b91 100644 --- a/pkg/enqueue/Consumption/QueueConsumer.php +++ b/pkg/enqueue/Consumption/QueueConsumer.php @@ -14,6 +14,7 @@ use Interop\Queue\PsrMessage; use Interop\Queue\PsrProcessor; use Interop\Queue\PsrQueue; +use Interop\Queue\PsrSubscriptionConsumerAwareContext; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; @@ -58,6 +59,13 @@ class QueueConsumer */ private $logger; + /** + * @deprecated added as BC layer, will be a default behavior in 0.9 version. + * + * @var bool + */ + private $enableSubscriptionConsumer; + /** * @param PsrContext $psrContext * @param ExtensionInterface|ChainExtension|null $extension @@ -77,6 +85,8 @@ public function __construct( $this->boundProcessors = []; $this->logger = new NullLogger(); + + $this->enableSubscriptionConsumer = false; } /** @@ -187,7 +197,45 @@ public function consume(ExtensionInterface $runtimeExtension = null) $this->logger->info('Start consuming'); - if ($this->psrContext instanceof AmqpContext) { + $subscriptionConsumer = null; + if ($this->enableSubscriptionConsumer) { + $subscriptionConsumer = new FallbackSubscriptionConsumer(); + if ($context instanceof PsrSubscriptionConsumerAwareContext) { + $subscriptionConsumer = $context->createSubscriptionConsumer(); + } + + $callback = function (PsrMessage $message, PsrConsumer $consumer) use (&$context) { + $currentProcessor = null; + + /** @var PsrQueue $queue */ + foreach ($this->boundProcessors as list($queue, $processor)) { + if ($queue->getQueueName() === $consumer->getQueue()->getQueueName()) { + $currentProcessor = $processor; + } + } + + if (false == $currentProcessor) { + throw new \LogicException(sprintf('The processor for the queue "%s" could not be found.', $consumer->getQueue()->getQueueName())); + } + + $context = new Context($this->psrContext); + $context->setLogger($this->logger); + $context->setPsrQueue($consumer->getQueue()); + $context->setPsrConsumer($consumer); + $context->setPsrProcessor($currentProcessor); + $context->setPsrMessage($message); + + $this->doConsume($this->extension, $context); + + return true; + }; + + foreach ($consumers as $consumer) { + /* @var AmqpConsumer $consumer */ + + $subscriptionConsumer->subscribe($consumer, $callback); + } + } elseif ($this->psrContext instanceof AmqpContext) { $callback = function (AmqpMessage $message, AmqpConsumer $consumer) use (&$context) { $currentProcessor = null; @@ -223,7 +271,18 @@ public function consume(ExtensionInterface $runtimeExtension = null) while (true) { try { - if ($this->psrContext instanceof AmqpContext) { + if ($this->enableSubscriptionConsumer) { + $this->extension->onBeforeReceive($context); + + if ($context->isExecutionInterrupted()) { + throw new ConsumptionInterruptedException(); + } + + $subscriptionConsumer->consume($this->receiveTimeout); + + usleep($this->idleTimeout * 1000); + $this->extension->onIdle($context); + } elseif ($this->psrContext instanceof AmqpContext) { $this->extension->onBeforeReceive($context); if ($context->isExecutionInterrupted()) { @@ -251,7 +310,13 @@ public function consume(ExtensionInterface $runtimeExtension = null) } catch (ConsumptionInterruptedException $e) { $this->logger->info(sprintf('Consuming interrupted')); - if ($this->psrContext instanceof AmqpContext) { + if ($this->enableSubscriptionConsumer) { + foreach ($consumers as $consumer) { + /* @var PsrConsumer $consumer */ + + $subscriptionConsumer->unsubscribe($consumer); + } + } elseif ($this->psrContext instanceof AmqpContext) { foreach ($consumers as $consumer) { /* @var AmqpConsumer $consumer */ @@ -279,6 +344,14 @@ public function consume(ExtensionInterface $runtimeExtension = null) } } + /** + * @param bool $enableSubscriptionConsumer + */ + public function enableSubscriptionConsumer(bool $enableSubscriptionConsumer) + { + $this->enableSubscriptionConsumer = $enableSubscriptionConsumer; + } + /** * @param ExtensionInterface $extension * @param Context $context diff --git a/pkg/enqueue/Tests/Consumption/FallbackSubscriptionConsumerTest.php b/pkg/enqueue/Tests/Consumption/FallbackSubscriptionConsumerTest.php new file mode 100644 index 000000000..b054180ed --- /dev/null +++ b/pkg/enqueue/Tests/Consumption/FallbackSubscriptionConsumerTest.php @@ -0,0 +1,267 @@ +assertTrue($rc->implementsInterface(PsrSubscriptionConsumer::class)); + } + + public function testCouldBeConstructedWithoutAnyArguments() + { + new FallbackSubscriptionConsumer(); + } + + public function testShouldInitSubscribersPropertyWithEmptyArray() + { + $subscriptionConsumer = new FallbackSubscriptionConsumer(); + + $this->assertAttributeSame([], 'subscribers', $subscriptionConsumer); + } + + public function testShouldAddConsumerAndCallbackToSubscribersPropertyOnSubscribe() + { + $subscriptionConsumer = new FallbackSubscriptionConsumer(); + + $fooCallback = function () {}; + $fooConsumer = $this->createConsumerStub('foo_queue'); + + $barCallback = function () {}; + $barConsumer = $this->createConsumerStub('bar_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + $subscriptionConsumer->subscribe($barConsumer, $barCallback); + + $this->assertAttributeSame([ + 'foo_queue' => [$fooConsumer, $fooCallback], + 'bar_queue' => [$barConsumer, $barCallback], + ], 'subscribers', $subscriptionConsumer); + } + + public function testThrowsIfTrySubscribeAnotherConsumerToAlreadySubscribedQueue() + { + $subscriptionConsumer = new FallbackSubscriptionConsumer(); + + $fooCallback = function () {}; + $fooConsumer = $this->createConsumerStub('foo_queue'); + + $barCallback = function () {}; + $barConsumer = $this->createConsumerStub('foo_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('There is a consumer subscribed to queue: "foo_queue"'); + $subscriptionConsumer->subscribe($barConsumer, $barCallback); + } + + public function testShouldAllowSubscribeSameConsumerAndCallbackSecondTime() + { + $subscriptionConsumer = new FallbackSubscriptionConsumer(); + + $fooCallback = function () {}; + $fooConsumer = $this->createConsumerStub('foo_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + } + + public function testShouldRemoveSubscribedConsumerOnUnsubscribeCall() + { + $subscriptionConsumer = new FallbackSubscriptionConsumer(); + + $fooConsumer = $this->createConsumerStub('foo_queue'); + $barConsumer = $this->createConsumerStub('bar_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, function () {}); + $subscriptionConsumer->subscribe($barConsumer, function () {}); + + // guard + $this->assertAttributeCount(2, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribe($fooConsumer); + + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + } + + public function testShouldDoNothingIfTryUnsubscribeNotSubscribedQueueName() + { + $subscriptionConsumer = new FallbackSubscriptionConsumer(); + + $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {}); + + // guard + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribe($this->createConsumerStub('bar_queue')); + + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + } + + public function testShouldDoNothingIfTryUnsubscribeNotSubscribedConsumer() + { + $subscriptionConsumer = new FallbackSubscriptionConsumer(); + + $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {}); + + // guard + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribe($this->createConsumerStub('foo_queue')); + + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + } + + public function testShouldRemoveAllSubscriberOnUnsubscribeAllCall() + { + $subscriptionConsumer = new FallbackSubscriptionConsumer(); + + $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {}); + $subscriptionConsumer->subscribe($this->createConsumerStub('bar_queue'), function () {}); + + // guard + $this->assertAttributeCount(2, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribeAll(); + + $this->assertAttributeCount(0, 'subscribers', $subscriptionConsumer); + } + + public function testShouldConsumeMessagesFromTwoQueuesInExpectedOrder() + { + $firstMessage = $this->createMessageStub('first'); + $secondMessage = $this->createMessageStub('second'); + $thirdMessage = $this->createMessageStub('third'); + $fourthMessage = $this->createMessageStub('fourth'); + $fifthMessage = $this->createMessageStub('fifth'); + + $fooMessages = [null, $firstMessage, null, $secondMessage, $thirdMessage]; + + $fooConsumer = $this->createConsumerStub('foo_queue'); + $fooConsumer + ->expects($this->any()) + ->method('receiveNoWait') + ->willReturnCallback(function () use (&$fooMessages) { + if (empty($fooMessages)) { + return null; + } + + return array_shift($fooMessages); + }) + ; + + $barMessages = [$fourthMessage, null, null, $fifthMessage]; + + $barConsumer = $this->createConsumerStub('bar_queue'); + $barConsumer + ->expects($this->any()) + ->method('receiveNoWait') + ->willReturnCallback(function () use (&$barMessages) { + if (empty($barMessages)) { + return null; + } + + return array_shift($barMessages); + }) + ; + + $actualOrder = []; + $callback = function (PsrMessage $message, PsrConsumer $consumer) use (&$actualOrder) { + $actualOrder[] = [$message->getBody(), $consumer->getQueue()->getQueueName()]; + }; + + $subscriptionConsumer = new FallbackSubscriptionConsumer(); + + $subscriptionConsumer->subscribe($fooConsumer, $callback); + $subscriptionConsumer->subscribe($barConsumer, $callback); + + $subscriptionConsumer->consume(100); + + $this->assertEquals([ + ['fourth', 'bar_queue'], + ['first', 'foo_queue'], + ['second', 'foo_queue'], + ['fifth', 'bar_queue'], + ['third', 'foo_queue'], + ], $actualOrder); + } + + public function testThrowsIfTryConsumeWithoutSubscribers() + { + $subscriptionConsumer = new FallbackSubscriptionConsumer(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('No subscribers'); + $subscriptionConsumer->consume(); + } + + public function testShouldConsumeTillTimeoutIsReached() + { + $fooConsumer = $this->createConsumerStub('foo_queue'); + $fooConsumer + ->expects($this->any()) + ->method('receiveNoWait') + ->willReturn(null) + ; + + $subscriptionConsumer = new FallbackSubscriptionConsumer(); + + $subscriptionConsumer->subscribe($fooConsumer, function () {}); + + $startAt = microtime(true); + $subscriptionConsumer->consume(500); + $endAt = microtime(true); + + $this->assertGreaterThan(0.49, $endAt - $startAt); + } + + /** + * @param null|mixed $body + * + * @return PsrMessage|\PHPUnit_Framework_MockObject_MockObject + */ + private function createMessageStub($body = null) + { + $messageMock = $this->createMock(PsrMessage::class); + $messageMock + ->expects($this->any()) + ->method('getBody') + ->willReturn($body) + ; + + return $messageMock; + } + + /** + * @param null|mixed $queueName + * + * @return PsrConsumer|\PHPUnit_Framework_MockObject_MockObject + */ + private function createConsumerStub($queueName = null) + { + $queueMock = $this->createMock(PsrQueue::class); + $queueMock + ->expects($this->any()) + ->method('getQueueName') + ->willReturn($queueName); + + $consumerMock = $this->createMock(PsrConsumer::class); + $consumerMock + ->expects($this->any()) + ->method('getQueue') + ->willReturn($queueMock) + ; + + return $consumerMock; + } +} From 2603b9a2de06e44e5bc45cd12e9b97cfb5c47a5b Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 6 Aug 2018 16:38:38 +0300 Subject: [PATCH 3/4] add a parameter to enable subscription consumer --- pkg/amqp-tools/SubscriptionConsumer.php | 3 +++ pkg/enqueue-bundle/Resources/config/services.yml | 11 +++++++++-- pkg/enqueue-bundle/composer.json | 2 +- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/pkg/amqp-tools/SubscriptionConsumer.php b/pkg/amqp-tools/SubscriptionConsumer.php index a207d7b63..e992a6b06 100644 --- a/pkg/amqp-tools/SubscriptionConsumer.php +++ b/pkg/amqp-tools/SubscriptionConsumer.php @@ -6,6 +6,9 @@ use Interop\Queue\PsrConsumer; use Interop\Queue\PsrSubscriptionConsumer; +/** + * @deprecated this is BC layer, will be removed in 0.9 + */ final class SubscriptionConsumer implements PsrSubscriptionConsumer { /** diff --git a/pkg/enqueue-bundle/Resources/config/services.yml b/pkg/enqueue-bundle/Resources/config/services.yml index 452ab315b..5ed953077 100644 --- a/pkg/enqueue-bundle/Resources/config/services.yml +++ b/pkg/enqueue-bundle/Resources/config/services.yml @@ -1,3 +1,8 @@ +parameters: + enqueue.queue_consumer.enable_subscription_consumer: false + enqueue.queue_consumer.default_idle_time: 0 + enqueue.queue_consumer.default_receive_timeout: 10 + services: enqueue.consumption.extensions: class: 'Enqueue\Consumption\ChainExtension' @@ -11,8 +16,10 @@ services: arguments: - '@enqueue.transport.context' - '@enqueue.consumption.extensions' - - ~ - - ~ + - '%enqueue.queue_consumer.default_idle_time%' + - '%enqueue.queue_consumer.default_receive_timeout%' + calls: + - ['enableSubscriptionConsumer', ['%enqueue.queue_consumer.enable_subscription_consumer%']] # Deprecated. To be removed in 0.10. enqueue.consumption.queue_consumer: diff --git a/pkg/enqueue-bundle/composer.json b/pkg/enqueue-bundle/composer.json index 11d7d220d..eb56e3e0e 100644 --- a/pkg/enqueue-bundle/composer.json +++ b/pkg/enqueue-bundle/composer.json @@ -8,7 +8,7 @@ "require": { "php": ">=5.6", "symfony/framework-bundle": "^2.8|^3|^4", - "enqueue/enqueue": "^0.8@dev", + "enqueue/enqueue": "^0.8.34@dev", "enqueue/null": "^0.8@dev", "enqueue/async-event-dispatcher": "^0.8@dev" }, From cb865a9461322e00f225e6bb973d4a0d1194d1cc Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 6 Aug 2018 18:24:56 +0300 Subject: [PATCH 4/4] reset changes in composer.json --- pkg/amqp-bunny/composer.json | 4 ++-- pkg/amqp-ext/composer.json | 4 ++-- pkg/amqp-lib/composer.json | 4 ++-- pkg/amqp-tools/composer.json | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/amqp-bunny/composer.json b/pkg/amqp-bunny/composer.json index 15c917900..7303d7adf 100644 --- a/pkg/amqp-bunny/composer.json +++ b/pkg/amqp-bunny/composer.json @@ -8,9 +8,9 @@ "require": { "php": ">=5.6", - "queue-interop/amqp-interop": "^0.7.4@dev", + "queue-interop/amqp-interop": "^0.7@dev", "bunny/bunny": "^0.2.4|^0.3|^0.4", - "enqueue/amqp-tools": "^0.8.24@dev" + "enqueue/amqp-tools": "^0.8@dev" }, "require-dev": { "phpunit/phpunit": "~5.4.0", diff --git a/pkg/amqp-ext/composer.json b/pkg/amqp-ext/composer.json index 0cf7847cf..62ec0e45a 100644 --- a/pkg/amqp-ext/composer.json +++ b/pkg/amqp-ext/composer.json @@ -9,8 +9,8 @@ "php": ">=5.6", "ext-amqp": "^1.9.3", - "queue-interop/amqp-interop": "^0.7.4@dev", - "enqueue/amqp-tools": "^0.8.24@dev" + "queue-interop/amqp-interop": "^0.7@dev", + "enqueue/amqp-tools": "^0.8@dev" }, "require-dev": { "phpunit/phpunit": "~5.4.0", diff --git a/pkg/amqp-lib/composer.json b/pkg/amqp-lib/composer.json index dd6df3de4..07784a317 100644 --- a/pkg/amqp-lib/composer.json +++ b/pkg/amqp-lib/composer.json @@ -8,8 +8,8 @@ "require": { "php": ">=5.6", "php-amqplib/php-amqplib": "^2.7@dev", - "queue-interop/amqp-interop": "^0.7.4@dev", - "enqueue/amqp-tools": "^0.8.24@dev" + "queue-interop/amqp-interop": "^0.7@dev", + "enqueue/amqp-tools": "^0.8@dev" }, "require-dev": { "phpunit/phpunit": "~5.4.0", diff --git a/pkg/amqp-tools/composer.json b/pkg/amqp-tools/composer.json index eecd90d6e..4d03bda67 100644 --- a/pkg/amqp-tools/composer.json +++ b/pkg/amqp-tools/composer.json @@ -8,7 +8,7 @@ "require": { "php": ">=5.6", "queue-interop/queue-interop": "^0.6@dev|^1.0.0-alpha1", - "queue-interop/amqp-interop": "^0.7.4@dev" + "queue-interop/amqp-interop": "^0.7@dev" }, "require-dev": { "phpunit/phpunit": "~5.4.0",