From 63ee2fce4c5c35545ad2a7907eb4ea42170d52e0 Mon Sep 17 00:00:00 2001 From: Roman Samarsky Date: Thu, 18 Oct 2018 02:56:40 +0300 Subject: [PATCH 1/5] MongoDB Subscription Consumer feature --- pkg/mongodb/MongodbConsumer.php | 18 +- pkg/mongodb/MongodbContext.php | 3 +- pkg/mongodb/MongodbMessage.php | 16 ++ pkg/mongodb/MongodbSubscriptionConsumer.php | 134 +++++++++++++ .../Tests/MongodbSubscriptionConsumerTest.php | 177 ++++++++++++++++++ ...umerConsumeFromAllSubscribedQueuesTest.php | 44 +++++ ...onConsumerConsumeUntilUnsubscribedTest.php | 44 +++++ ...odbSubscriptionConsumerStopOnFalseTest.php | 44 +++++ pkg/redis/RedisSubscriptionConsumer.php | 5 - .../RedisConnectionFactoryConfigTest.php | 2 +- .../Tests/RedisSubscriptionConsumerTest.php | 2 +- pkg/test/MongodbExtensionTrait.php | 2 + 12 files changed, 466 insertions(+), 25 deletions(-) create mode 100644 pkg/mongodb/MongodbSubscriptionConsumer.php create mode 100644 pkg/mongodb/Tests/MongodbSubscriptionConsumerTest.php create mode 100644 pkg/mongodb/Tests/Spec/MongodbSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php create mode 100644 pkg/mongodb/Tests/Spec/MongodbSubscriptionConsumerConsumeUntilUnsubscribedTest.php create mode 100644 pkg/mongodb/Tests/Spec/MongodbSubscriptionConsumerStopOnFalseTest.php diff --git a/pkg/mongodb/MongodbConsumer.php b/pkg/mongodb/MongodbConsumer.php index 062b4d275..b6e80a71c 100644 --- a/pkg/mongodb/MongodbConsumer.php +++ b/pkg/mongodb/MongodbConsumer.php @@ -115,7 +115,7 @@ public function reject(Message $message, bool $requeue = false): void } } - protected function receiveMessage(): ?MongodbMessage + private function receiveMessage(): ?MongodbMessage { $now = time(); $collection = $this->context->getCollection(); @@ -137,23 +137,9 @@ protected function receiveMessage(): ?MongodbMessage return null; } if (empty($message['time_to_live']) || $message['time_to_live'] > time()) { - return $this->convertMessage($message); + return MongodbMessage::fromArrayDbResult($message); } return null; } - - protected function convertMessage(array $mongodbMessage): MongodbMessage - { - $properties = JSON::decode($mongodbMessage['properties']); - $headers = JSON::decode($mongodbMessage['headers']); - - $message = $this->context->createMessage($mongodbMessage['body'], $properties, $headers); - $message->setId((string) $mongodbMessage['_id']); - $message->setPriority((int) $mongodbMessage['priority']); - $message->setRedelivered((bool) $mongodbMessage['redelivered']); - $message->setPublishedAt((int) $mongodbMessage['published_at']); - - return $message; - } } diff --git a/pkg/mongodb/MongodbContext.php b/pkg/mongodb/MongodbContext.php index f5f9049e7..9a9415b80 100644 --- a/pkg/mongodb/MongodbContext.php +++ b/pkg/mongodb/MongodbContext.php @@ -8,7 +8,6 @@ use Interop\Queue\Context; use Interop\Queue\Destination; use Interop\Queue\Exception\InvalidDestinationException; -use Interop\Queue\Exception\SubscriptionConsumerNotSupportedException; use Interop\Queue\Exception\TemporaryQueueNotSupportedException; use Interop\Queue\Message; use Interop\Queue\Producer; @@ -107,7 +106,7 @@ public function close(): void public function createSubscriptionConsumer(): SubscriptionConsumer { - throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt(); + return new MongodbSubscriptionConsumer($this); } /** diff --git a/pkg/mongodb/MongodbMessage.php b/pkg/mongodb/MongodbMessage.php index fbfbd75d6..0ceeaa4f4 100644 --- a/pkg/mongodb/MongodbMessage.php +++ b/pkg/mongodb/MongodbMessage.php @@ -65,6 +65,22 @@ public function __construct(string $body = '', array $properties = [], array $he $this->redelivered = false; } + public static function fromArrayDbResult(array $arrayResult): self + { + $message = new self( + $arrayResult['body'], + JSON::decode($arrayResult['properties']), + JSON::decode($arrayResult['headers']) + ); + + $message->setId((string) $arrayResult['_id']); + $message->setPriority((int) $arrayResult['priority']); + $message->setRedelivered((bool) $arrayResult['redelivered']); + $message->setPublishedAt((int) $arrayResult['published_at']); + + return $message; + } + public function setId(string $id = null): void { $this->id = $id; diff --git a/pkg/mongodb/MongodbSubscriptionConsumer.php b/pkg/mongodb/MongodbSubscriptionConsumer.php new file mode 100644 index 000000000..717671682 --- /dev/null +++ b/pkg/mongodb/MongodbSubscriptionConsumer.php @@ -0,0 +1,134 @@ +context = $context; + $this->subscribers = []; + } + + public function consume(int $timeout = 0): void + { + if (empty($this->subscribers)) { + throw new \LogicException('No subscribers'); + } + + $timeout = (int) ceil($timeout / 1000); + $endAt = time() + $timeout; + + $queueNames = []; + foreach (array_keys($this->subscribers) as $queueName) { + $queueNames[$queueName] = $queueName; + } + + $currentQueueNames = []; + while (true) { + if (empty($currentQueueNames)) { + $currentQueueNames = $queueNames; + } + + $result = $this->context->getCollection()->findOneAndDelete( + [ + 'queue' => ['$in' => array_keys($currentQueueNames)], + '$or' => [ + ['delayed_until' => ['$exists' => false]], + ['delayed_until' => ['$lte' => time()]], + ], + ], + [ + 'sort' => ['priority' => -1, 'published_at' => 1], + 'typeMap' => ['root' => 'array', 'document' => 'array'], + ] + ); + + if ($result) { + list($consumer, $callback) = $this->subscribers[$result['queue']]; + + $message = MongodbMessage::fromArrayDbResult($result); + + if (false === call_user_func($callback, $message, $consumer)) { + return; + } + + unset($currentQueueNames[$result['queue']]); + } else { + $currentQueueNames = []; + } + + if ($timeout && microtime(true) >= $endAt) { + return; + } + } + } + + /** + * @param MongodbConsumer $consumer + */ + public function subscribe(Consumer $consumer, callable $callback): void + { + if (false == $consumer instanceof MongodbConsumer) { + throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', MongodbConsumer::class, get_class($consumer))); + } + + $queueName = $consumer->getQueue()->getQueueName(); + if (array_key_exists($queueName, $this->subscribers)) { + if ($this->subscribers[$queueName][0] === $consumer && $this->subscribers[$queueName][1] === $callback) { + return; + } + + throw new \InvalidArgumentException(sprintf('There is a consumer subscribed to queue: "%s"', $queueName)); + } + + $this->subscribers[$queueName] = [$consumer, $callback]; + } + + /** + * @param MongodbConsumer $consumer + */ + public function unsubscribe(Consumer $consumer): void + { + if (false == $consumer instanceof MongodbConsumer) { + throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', MongodbConsumer::class, get_class($consumer))); + } + + $queueName = $consumer->getQueue()->getQueueName(); + + if (false == array_key_exists($queueName, $this->subscribers)) { + return; + } + + if ($this->subscribers[$queueName][0] !== $consumer) { + return; + } + + unset($this->subscribers[$queueName]); + } + + public function unsubscribeAll(): void + { + $this->subscribers = []; + } +} diff --git a/pkg/mongodb/Tests/MongodbSubscriptionConsumerTest.php b/pkg/mongodb/Tests/MongodbSubscriptionConsumerTest.php new file mode 100644 index 000000000..88899c7bb --- /dev/null +++ b/pkg/mongodb/Tests/MongodbSubscriptionConsumerTest.php @@ -0,0 +1,177 @@ +assertTrue($rc->implementsInterface(SubscriptionConsumer::class)); + } + + public function testCouldBeConstructedWithMongodbContextAsFirstArgument() + { + new MongodbSubscriptionConsumer($this->createMongodbContextMock()); + } + + public function testShouldAddConsumerAndCallbackToSubscribersPropertyOnSubscribe() + { + $subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock()); + + $fooCallback = function () {}; + $fooConsumer = $this->createConsumerStub('foo_queue'); + + $barCallback = function () {}; + $barConsumer = $this->createConsumerStub('bar_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + $subscriptionConsumer->subscribe($barConsumer, $barCallback); + + $this->assertAttributeSame([ + 'foo_queue' => [$fooConsumer, $fooCallback], + 'bar_queue' => [$barConsumer, $barCallback], + ], 'subscribers', $subscriptionConsumer); + } + + public function testThrowsIfTrySubscribeAnotherConsumerToAlreadySubscribedQueue() + { + $subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock()); + + $fooCallback = function () {}; + $fooConsumer = $this->createConsumerStub('foo_queue'); + + $barCallback = function () {}; + $barConsumer = $this->createConsumerStub('foo_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('There is a consumer subscribed to queue: "foo_queue"'); + $subscriptionConsumer->subscribe($barConsumer, $barCallback); + } + + public function testShouldAllowSubscribeSameConsumerAndCallbackSecondTime() + { + $subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock()); + + $fooCallback = function () {}; + $fooConsumer = $this->createConsumerStub('foo_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + $subscriptionConsumer->subscribe($fooConsumer, $fooCallback); + } + + public function testShouldRemoveSubscribedConsumerOnUnsubscribeCall() + { + $subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock()); + + $fooConsumer = $this->createConsumerStub('foo_queue'); + $barConsumer = $this->createConsumerStub('bar_queue'); + + $subscriptionConsumer->subscribe($fooConsumer, function () {}); + $subscriptionConsumer->subscribe($barConsumer, function () {}); + + // guard + $this->assertAttributeCount(2, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribe($fooConsumer); + + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + } + + public function testShouldDoNothingIfTryUnsubscribeNotSubscribedQueueName() + { + $subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock()); + + $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {}); + + // guard + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribe($this->createConsumerStub('bar_queue')); + + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + } + + public function testShouldDoNothingIfTryUnsubscribeNotSubscribedConsumer() + { + $subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock()); + + $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {}); + + // guard + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribe($this->createConsumerStub('foo_queue')); + + $this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer); + } + + public function testShouldRemoveAllSubscriberOnUnsubscribeAllCall() + { + $subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock()); + + $subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {}); + $subscriptionConsumer->subscribe($this->createConsumerStub('bar_queue'), function () {}); + + // guard + $this->assertAttributeCount(2, 'subscribers', $subscriptionConsumer); + + $subscriptionConsumer->unsubscribeAll(); + + $this->assertAttributeCount(0, 'subscribers', $subscriptionConsumer); + } + + public function testThrowsIfTryConsumeWithoutSubscribers() + { + $subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock()); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('No subscribers'); + + $subscriptionConsumer->consume(); + } + + /** + * @return MongodbContext|\PHPUnit_Framework_MockObject_MockObject + */ + private function createMongodbContextMock() + { + return $this->createMock(MongodbContext::class); + } + + /** + * @param null|mixed $queueName + * + * @return Consumer|\PHPUnit_Framework_MockObject_MockObject + */ + private function createConsumerStub($queueName = null) + { + $queueMock = $this->createMock(Queue::class); + $queueMock + ->expects($this->any()) + ->method('getQueueName') + ->willReturn($queueName); + + $consumerMock = $this->createMock(MongodbConsumer::class); + $consumerMock + ->expects($this->any()) + ->method('getQueue') + ->willReturn($queueMock) + ; + + return $consumerMock; + } +} diff --git a/pkg/mongodb/Tests/Spec/MongodbSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php b/pkg/mongodb/Tests/Spec/MongodbSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php new file mode 100644 index 000000000..664990a68 --- /dev/null +++ b/pkg/mongodb/Tests/Spec/MongodbSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php @@ -0,0 +1,44 @@ +buildMongodbContext(); + } + + /** + * @param MongodbContext $context + * + * {@inheritdoc} + */ + protected function createQueue(Context $context, $queueName) + { + /** @var MongodbDestination $queue */ + $queue = parent::createQueue($context, $queueName); + $context->purgeQueue($queue); + + return $queue; + } +} diff --git a/pkg/mongodb/Tests/Spec/MongodbSubscriptionConsumerConsumeUntilUnsubscribedTest.php b/pkg/mongodb/Tests/Spec/MongodbSubscriptionConsumerConsumeUntilUnsubscribedTest.php new file mode 100644 index 000000000..1071c1267 --- /dev/null +++ b/pkg/mongodb/Tests/Spec/MongodbSubscriptionConsumerConsumeUntilUnsubscribedTest.php @@ -0,0 +1,44 @@ +buildMongodbContext(); + } + + /** + * @param MongodbContext $context + * + * {@inheritdoc} + */ + protected function createQueue(Context $context, $queueName) + { + /** @var MongodbDestination $queue */ + $queue = parent::createQueue($context, $queueName); + $context->purgeQueue($queue); + + return $queue; + } +} diff --git a/pkg/mongodb/Tests/Spec/MongodbSubscriptionConsumerStopOnFalseTest.php b/pkg/mongodb/Tests/Spec/MongodbSubscriptionConsumerStopOnFalseTest.php new file mode 100644 index 000000000..321e16bba --- /dev/null +++ b/pkg/mongodb/Tests/Spec/MongodbSubscriptionConsumerStopOnFalseTest.php @@ -0,0 +1,44 @@ +buildMongodbContext(); + } + + /** + * @param MongodbContext $context + * + * {@inheritdoc} + */ + protected function createQueue(Context $context, $queueName) + { + /** @var MongodbDestination $queue */ + $queue = parent::createQueue($context, $queueName); + $context->getClient()->dropDatabase($queueName); + + return $queue; + } +} diff --git a/pkg/redis/RedisSubscriptionConsumer.php b/pkg/redis/RedisSubscriptionConsumer.php index 6e1a24dc0..1b6cd1149 100644 --- a/pkg/redis/RedisSubscriptionConsumer.php +++ b/pkg/redis/RedisSubscriptionConsumer.php @@ -50,11 +50,6 @@ public function consume(int $timeout = 0): void $currentQueueNames = $queueNames; } - /** - * @var string - * @var Consumer $consumer - * @var callable $processor - */ $result = $this->context->getRedis()->brpop($currentQueueNames, $timeout ?: 5); if ($result) { $message = RedisMessage::jsonUnserialize($result->getMessage()); diff --git a/pkg/redis/Tests/RedisConnectionFactoryConfigTest.php b/pkg/redis/Tests/RedisConnectionFactoryConfigTest.php index e2f007737..953006af9 100644 --- a/pkg/redis/Tests/RedisConnectionFactoryConfigTest.php +++ b/pkg/redis/Tests/RedisConnectionFactoryConfigTest.php @@ -22,7 +22,7 @@ public function testThrowNeitherArrayStringNorNullGivenAsConfig() new RedisConnectionFactory(new \stdClass()); } - public function testThrowIfSchemeIsNotAmqp() + public function testThrowIfSchemeIsNotRedis() { $this->expectException(\LogicException::class); $this->expectExceptionMessage('The given scheme protocol "http" is not supported. It must be one of "redis", "rediss", "tcp", "tls", "unix"'); diff --git a/pkg/redis/Tests/RedisSubscriptionConsumerTest.php b/pkg/redis/Tests/RedisSubscriptionConsumerTest.php index 98c6142f9..12c377500 100644 --- a/pkg/redis/Tests/RedisSubscriptionConsumerTest.php +++ b/pkg/redis/Tests/RedisSubscriptionConsumerTest.php @@ -19,7 +19,7 @@ public function testShouldImplementSubscriptionConsumerInterface() $this->assertTrue($rc->implementsInterface(SubscriptionConsumer::class)); } - public function testCouldBeConstructedWithAmqpContextAsFirstArgument() + public function testCouldBeConstructedWithRedisContextAsFirstArgument() { new RedisSubscriptionConsumer($this->createRedisContextMock()); } diff --git a/pkg/test/MongodbExtensionTrait.php b/pkg/test/MongodbExtensionTrait.php index 29c146c1e..3ba9e93e0 100644 --- a/pkg/test/MongodbExtensionTrait.php +++ b/pkg/test/MongodbExtensionTrait.php @@ -1,5 +1,7 @@ Date: Thu, 18 Oct 2018 11:49:44 +0300 Subject: [PATCH 2/5] Replaced MongodbMessage::fromArrrayDbResult() with MongodbContext::convertMessage() --- pkg/mongodb/MongodbConsumer.php | 2 +- pkg/mongodb/MongodbContext.php | 19 +++++++++++++++ pkg/mongodb/MongodbMessage.php | 16 ------------- pkg/mongodb/MongodbSubscriptionConsumer.php | 2 +- pkg/mongodb/Tests/MongodbContextTest.php | 26 +++++++++++++++++++++ 5 files changed, 47 insertions(+), 18 deletions(-) diff --git a/pkg/mongodb/MongodbConsumer.php b/pkg/mongodb/MongodbConsumer.php index b6e80a71c..494210573 100644 --- a/pkg/mongodb/MongodbConsumer.php +++ b/pkg/mongodb/MongodbConsumer.php @@ -137,7 +137,7 @@ private function receiveMessage(): ?MongodbMessage return null; } if (empty($message['time_to_live']) || $message['time_to_live'] > time()) { - return MongodbMessage::fromArrayDbResult($message); + return $this->context->convertMessage($message); } return null; diff --git a/pkg/mongodb/MongodbContext.php b/pkg/mongodb/MongodbContext.php index 9a9415b80..b0beb98b3 100644 --- a/pkg/mongodb/MongodbContext.php +++ b/pkg/mongodb/MongodbContext.php @@ -109,6 +109,25 @@ public function createSubscriptionConsumer(): SubscriptionConsumer return new MongodbSubscriptionConsumer($this); } + /** + * @internal It must be used here and in the consumer only + */ + public function convertMessage(array $mongodbMessage): MongodbMessage + { + $mongodbMessageObj = new MongodbMessage( + $mongodbMessage['body'], + JSON::decode($mongodbMessage['properties']), + JSON::decode($mongodbMessage['headers']) + ); + + $mongodbMessageObj->setId((string) $mongodbMessage['_id']); + $mongodbMessageObj->setPriority((int) $mongodbMessage['priority']); + $mongodbMessageObj->setRedelivered((bool) $mongodbMessage['redelivered']); + $mongodbMessageObj->setPublishedAt((int) $mongodbMessage['published_at']); + + return $mongodbMessageObj; + } + /** * @param MongodbDestination $queue */ diff --git a/pkg/mongodb/MongodbMessage.php b/pkg/mongodb/MongodbMessage.php index 0ceeaa4f4..fbfbd75d6 100644 --- a/pkg/mongodb/MongodbMessage.php +++ b/pkg/mongodb/MongodbMessage.php @@ -65,22 +65,6 @@ public function __construct(string $body = '', array $properties = [], array $he $this->redelivered = false; } - public static function fromArrayDbResult(array $arrayResult): self - { - $message = new self( - $arrayResult['body'], - JSON::decode($arrayResult['properties']), - JSON::decode($arrayResult['headers']) - ); - - $message->setId((string) $arrayResult['_id']); - $message->setPriority((int) $arrayResult['priority']); - $message->setRedelivered((bool) $arrayResult['redelivered']); - $message->setPublishedAt((int) $arrayResult['published_at']); - - return $message; - } - public function setId(string $id = null): void { $this->id = $id; diff --git a/pkg/mongodb/MongodbSubscriptionConsumer.php b/pkg/mongodb/MongodbSubscriptionConsumer.php index 717671682..1dd7734ea 100644 --- a/pkg/mongodb/MongodbSubscriptionConsumer.php +++ b/pkg/mongodb/MongodbSubscriptionConsumer.php @@ -67,7 +67,7 @@ public function consume(int $timeout = 0): void if ($result) { list($consumer, $callback) = $this->subscribers[$result['queue']]; - $message = MongodbMessage::fromArrayDbResult($result); + $message = $this->context->convertMessage($result); if (false === call_user_func($callback, $message, $consumer)) { return; diff --git a/pkg/mongodb/Tests/MongodbContextTest.php b/pkg/mongodb/Tests/MongodbContextTest.php index e23f1b73a..7a795c402 100644 --- a/pkg/mongodb/Tests/MongodbContextTest.php +++ b/pkg/mongodb/Tests/MongodbContextTest.php @@ -70,6 +70,32 @@ public function testShouldCreateMessage() $this->assertFalse($message->isRedelivered()); } + public function testShouldConvertFromArrayToMongodbMessage() + { + $arrayData = [ + '_id' => 'stringId', + 'body' => 'theBody', + 'properties' => json_encode(['barProp' => 'barPropVal']), + 'headers' => json_encode(['fooHeader' => 'fooHeaderVal']), + 'priority' => '12', + 'published_at' => 1525935820, + 'redelivered' => false, + ]; + + $context = new MongodbContext($this->createClientMock()); + $message = $context->convertMessage($arrayData); + + $this->assertInstanceOf(MongodbMessage::class, $message); + + $this->assertEquals('stringId', $message->getId()); + $this->assertEquals('theBody', $message->getBody()); + $this->assertEquals(['barProp' => 'barPropVal'], $message->getProperties()); + $this->assertEquals(['fooHeader' => 'fooHeaderVal'], $message->getHeaders()); + $this->assertEquals(12, $message->getPriority()); + $this->assertEquals(1525935820, $message->getPublishedAt()); + $this->assertFalse($message->isRedelivered()); + } + public function testShouldCreateTopic() { $context = new MongodbContext($this->createClientMock()); From d04981745cfcbacc0a0ed91d5c0bcbf7f323efe9 Mon Sep 17 00:00:00 2001 From: Roman Samarsky Date: Thu, 18 Oct 2018 17:04:37 +0300 Subject: [PATCH 3/5] Replaced constructor with owned method createMessage() in MongodbContext. --- pkg/mongodb/MongodbContext.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/mongodb/MongodbContext.php b/pkg/mongodb/MongodbContext.php index b0beb98b3..968c98b1e 100644 --- a/pkg/mongodb/MongodbContext.php +++ b/pkg/mongodb/MongodbContext.php @@ -114,7 +114,7 @@ public function createSubscriptionConsumer(): SubscriptionConsumer */ public function convertMessage(array $mongodbMessage): MongodbMessage { - $mongodbMessageObj = new MongodbMessage( + $mongodbMessageObj = $this->createMessage( $mongodbMessage['body'], JSON::decode($mongodbMessage['properties']), JSON::decode($mongodbMessage['headers']) From d0ea85f623010b259670b8b26e03f0dcf62873b3 Mon Sep 17 00:00:00 2001 From: Roman Samarsky Date: Thu, 18 Oct 2018 18:42:01 +0300 Subject: [PATCH 4/5] Added usleep 200ms when no messages are recieved --- pkg/mongodb/MongodbSubscriptionConsumer.php | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/mongodb/MongodbSubscriptionConsumer.php b/pkg/mongodb/MongodbSubscriptionConsumer.php index 1dd7734ea..59063dd82 100644 --- a/pkg/mongodb/MongodbSubscriptionConsumer.php +++ b/pkg/mongodb/MongodbSubscriptionConsumer.php @@ -76,6 +76,8 @@ public function consume(int $timeout = 0): void unset($currentQueueNames[$result['queue']]); } else { $currentQueueNames = []; + + usleep(200000); // 200ms } if ($timeout && microtime(true) >= $endAt) { From 4a6295776c2d5dcb2f1490ae1b4b0dd654b1470a Mon Sep 17 00:00:00 2001 From: Roman Samarsky Date: Thu, 18 Oct 2018 18:47:12 +0300 Subject: [PATCH 5/5] Added Subscription consumer to docs --- docs/transport/mongodb.md | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/docs/transport/mongodb.md b/docs/transport/mongodb.md index 0904c11ed..610580478 100644 --- a/docs/transport/mongodb.md +++ b/docs/transport/mongodb.md @@ -10,6 +10,7 @@ Allows to use [MongoDB](https://www.mongodb.com/) as a message queue broker. * [Send expiration message](#send-expiration-message) * [Send delayed message](#send-delayed-message) * [Consume message](#consume-message) +* [Subscription consumer](#subscription-consumer) ## Installation @@ -139,4 +140,37 @@ $consumer->acknowledge($message); // $consumer->reject($message); ``` +## Subscription consumer + +```php +createConsumer($fooQueue); +$barConsumer = $psrContext->createConsumer($barQueue); + +$subscriptionConsumer = $psrContext->createSubscriptionConsumer(); +$subscriptionConsumer->subscribe($fooConsumer, function(PsrMessage $message, PsrConsumer $consumer) { + // process message + + $consumer->acknowledge($message); + + return true; +}); +$subscriptionConsumer->subscribe($barConsumer, function(PsrMessage $message, PsrConsumer $consumer) { + // process message + + $consumer->acknowledge($message); + + return true; +}); + +$subscriptionConsumer->consume(2000); // 2 sec +``` + [back to index](../index.md)