From f52be4bbc2887aa0b013a1f9a3e493500cc6ce89 Mon Sep 17 00:00:00 2001 From: Henning Date: Thu, 30 Sep 2021 15:21:39 +0200 Subject: [PATCH 1/5] just work on the next message based on priority and published_at --- pkg/dbal/DbalSubscriptionConsumer.php | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/pkg/dbal/DbalSubscriptionConsumer.php b/pkg/dbal/DbalSubscriptionConsumer.php index 2551043e3..5f93aea21 100644 --- a/pkg/dbal/DbalSubscriptionConsumer.php +++ b/pkg/dbal/DbalSubscriptionConsumer.php @@ -94,16 +94,11 @@ public function consume(int $timeout = 0): void $now = time(); $redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds - $currentQueueNames = []; while (true) { - if (empty($currentQueueNames)) { - $currentQueueNames = $queueNames; - } - $this->removeExpiredMessages(); $this->redeliverMessages(); - if ($message = $this->fetchMessage($currentQueueNames, $redeliveryDelay)) { + if ($message = $this->fetchMessage($queueNames, $redeliveryDelay)) { /** * @var DbalConsumer * @var callable $callback @@ -113,11 +108,7 @@ public function consume(int $timeout = 0): void if (false === call_user_func($callback, $message, $consumer)) { return; } - - unset($currentQueueNames[$message->getQueue()]); } else { - $currentQueueNames = []; - usleep($this->getPollingInterval() * 1000); } From b313429ae354508a633721db2dc0b7f77d82b6e1 Mon Sep 17 00:00:00 2001 From: Henning Date: Thu, 7 Oct 2021 20:03:36 +0200 Subject: [PATCH 2/5] add logic to walk through the different queues after each message --- pkg/dbal/DbalConsumerHelperTrait.php | 27 +++++++++++++++++++++++---- pkg/dbal/DbalSubscriptionConsumer.php | 8 +++++++- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/pkg/dbal/DbalConsumerHelperTrait.php b/pkg/dbal/DbalConsumerHelperTrait.php index 0d57541d6..57071ccce 100644 --- a/pkg/dbal/DbalConsumerHelperTrait.php +++ b/pkg/dbal/DbalConsumerHelperTrait.php @@ -30,16 +30,17 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa $endAt = microtime(true) + 0.2; // add 200ms $select = $this->getConnection()->createQueryBuilder() - ->select('id') + ->select('MIN(id)') + ->addSelect('queue') ->from($this->getContext()->getTableName()) ->andWhere('queue IN (:queues)') ->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil') ->andWhere('delivery_id IS NULL') ->addOrderBy('priority', 'asc') ->addOrderBy('published_at', 'asc') + ->groupBy('queue') ->setParameter('queues', $queues, Connection::PARAM_STR_ARRAY) - ->setParameter('delayedUntil', $now, DbalType::INTEGER) - ->setMaxResults(1); + ->setParameter('delayedUntil', $now, DbalType::INTEGER); $update = $this->getConnection()->createQueryBuilder() ->update($this->getContext()->getTableName()) @@ -53,7 +54,12 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa while (microtime(true) < $endAt) { try { - $result = $select->execute()->fetch(); + $results = $select->execute()->fetch(); + if (empty($results)) { + return null; + } + + $result = $this->getResultByQueueList($results, $queues); if (empty($result)) { return null; } @@ -155,4 +161,17 @@ private function deleteMessage(string $deliveryId): void ['delivery_id' => DbalType::GUID] ); } + + private function getResultByQueueList(array $results, array $queues): ?array + { + $results = array_combine(array_column($results, 'queue'), $results); + + foreach ($queues as $queue) { + if (isset($results[$queue])) { + return $results[$queue]; + } + } + + return null; + } } diff --git a/pkg/dbal/DbalSubscriptionConsumer.php b/pkg/dbal/DbalSubscriptionConsumer.php index 5f93aea21..e76c7d8a1 100644 --- a/pkg/dbal/DbalSubscriptionConsumer.php +++ b/pkg/dbal/DbalSubscriptionConsumer.php @@ -87,8 +87,9 @@ public function consume(int $timeout = 0): void $queueNames = []; foreach (array_keys($this->subscribers) as $queueName) { - $queueNames[$queueName] = $queueName; + $queueNames[] = $queueName; } + $queueNames = array_unique($queueNames); $timeout /= 1000; $now = time(); @@ -108,6 +109,11 @@ public function consume(int $timeout = 0): void if (false === call_user_func($callback, $message, $consumer)) { return; } + + $queueNames = array_filter($queueNames, static function ($queueName) use ($message) { + return $message->getQueue() !== $queueName; + }); + $queueNames[] = $message->getQueue(); } else { usleep($this->getPollingInterval() * 1000); } From 1f00e43660aa914b2b7e9816e018d919921e480d Mon Sep 17 00:00:00 2001 From: Henning Date: Thu, 7 Oct 2021 20:17:34 +0200 Subject: [PATCH 3/5] small changes reagrding naming --- pkg/dbal/DbalConsumerHelperTrait.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/dbal/DbalConsumerHelperTrait.php b/pkg/dbal/DbalConsumerHelperTrait.php index 57071ccce..05581cf7e 100644 --- a/pkg/dbal/DbalConsumerHelperTrait.php +++ b/pkg/dbal/DbalConsumerHelperTrait.php @@ -30,8 +30,7 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa $endAt = microtime(true) + 0.2; // add 200ms $select = $this->getConnection()->createQueryBuilder() - ->select('MIN(id)') - ->addSelect('queue') + ->select('MIN(id) as id', 'queue') ->from($this->getContext()->getTableName()) ->andWhere('queue IN (:queues)') ->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil') From d9af231e19ad603196ff687d0ed7a4216e4dcd17 Mon Sep 17 00:00:00 2001 From: Henning Date: Thu, 7 Oct 2021 21:45:08 +0200 Subject: [PATCH 4/5] micro select for each queue --- pkg/dbal/DbalConsumerHelperTrait.php | 40 +++++++++++++--------------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/pkg/dbal/DbalConsumerHelperTrait.php b/pkg/dbal/DbalConsumerHelperTrait.php index 05581cf7e..14dee6f7a 100644 --- a/pkg/dbal/DbalConsumerHelperTrait.php +++ b/pkg/dbal/DbalConsumerHelperTrait.php @@ -29,18 +29,6 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa $endAt = microtime(true) + 0.2; // add 200ms - $select = $this->getConnection()->createQueryBuilder() - ->select('MIN(id) as id', 'queue') - ->from($this->getContext()->getTableName()) - ->andWhere('queue IN (:queues)') - ->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil') - ->andWhere('delivery_id IS NULL') - ->addOrderBy('priority', 'asc') - ->addOrderBy('published_at', 'asc') - ->groupBy('queue') - ->setParameter('queues', $queues, Connection::PARAM_STR_ARRAY) - ->setParameter('delayedUntil', $now, DbalType::INTEGER); - $update = $this->getConnection()->createQueryBuilder() ->update($this->getContext()->getTableName()) ->set('delivery_id', ':deliveryId') @@ -53,12 +41,7 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa while (microtime(true) < $endAt) { try { - $results = $select->execute()->fetch(); - if (empty($results)) { - return null; - } - - $result = $this->getResultByQueueList($results, $queues); + $result = $this->getResultByQueueList($queues, $now); if (empty($result)) { return null; } @@ -161,13 +144,26 @@ private function deleteMessage(string $deliveryId): void ); } - private function getResultByQueueList(array $results, array $queues): ?array + private function getResultByQueueList(array $queues, int $now): ?array { - $results = array_combine(array_column($results, 'queue'), $results); + $select = $this->getConnection()->createQueryBuilder() + ->select('id') + ->from($this->getContext()->getTableName()) + ->andWhere('queue = :queue') + ->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil') + ->andWhere('delivery_id IS NULL') + ->addOrderBy('priority', 'asc') + ->addOrderBy('published_at', 'asc') + ->setParameter('delayedUntil', $now, DbalType::INTEGER) + ->setMaxResults(1); foreach ($queues as $queue) { - if (isset($results[$queue])) { - return $results[$queue]; + $select->setParameter('queue', $queue, DbalType::STRING); + + $result = $select->execute()->fetch(); + + if (!empty($result)) { + return $result; } } From 626002857cbe02d99901d12dbf38ac18263054a4 Mon Sep 17 00:00:00 2001 From: Henning Date: Mon, 10 Jan 2022 18:47:44 +0100 Subject: [PATCH 5/5] if no queue was consumed, then wait the polling interval. Otherweise continue processing the queues! --- pkg/dbal/DbalConsumerHelperTrait.php | 40 +++++++++------------------ pkg/dbal/DbalSubscriptionConsumer.php | 27 ++++++++++++------ 2 files changed, 31 insertions(+), 36 deletions(-) diff --git a/pkg/dbal/DbalConsumerHelperTrait.php b/pkg/dbal/DbalConsumerHelperTrait.php index aba7c9ac7..4a3d32997 100644 --- a/pkg/dbal/DbalConsumerHelperTrait.php +++ b/pkg/dbal/DbalConsumerHelperTrait.php @@ -29,6 +29,18 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa $endAt = microtime(true) + 0.2; // add 200ms + $select = $this->getConnection()->createQueryBuilder() + ->select('id') + ->from($this->getContext()->getTableName()) + ->andWhere('queue IN (:queues)') + ->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil') + ->andWhere('delivery_id IS NULL') + ->addOrderBy('priority', 'asc') + ->addOrderBy('published_at', 'asc') + ->setParameter('queues', $queues, Connection::PARAM_STR_ARRAY) + ->setParameter('delayedUntil', $now, DbalType::INTEGER) + ->setMaxResults(1); + $update = $this->getConnection()->createQueryBuilder() ->update($this->getContext()->getTableName()) ->set('delivery_id', ':deliveryId') @@ -41,7 +53,7 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa while (microtime(true) < $endAt) { try { - $result = $this->getResultByQueueList($queues, $now); + $result = $select->execute()->fetch(); if (empty($result)) { return null; } @@ -143,30 +155,4 @@ private function deleteMessage(string $deliveryId): void ['delivery_id' => DbalType::GUID] ); } - - private function getResultByQueueList(array $queues, int $now): ?array - { - $select = $this->getConnection()->createQueryBuilder() - ->select('id') - ->from($this->getContext()->getTableName()) - ->andWhere('queue = :queue') - ->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil') - ->andWhere('delivery_id IS NULL') - ->addOrderBy('priority', 'asc') - ->addOrderBy('published_at', 'asc') - ->setParameter('delayedUntil', $now, DbalType::INTEGER) - ->setMaxResults(1); - - foreach ($queues as $queue) { - $select->setParameter('queue', $queue, DbalType::STRING); - - $result = $select->execute()->fetch(); - - if (!empty($result)) { - return $result; - } - } - - return null; - } } diff --git a/pkg/dbal/DbalSubscriptionConsumer.php b/pkg/dbal/DbalSubscriptionConsumer.php index e76c7d8a1..145496917 100644 --- a/pkg/dbal/DbalSubscriptionConsumer.php +++ b/pkg/dbal/DbalSubscriptionConsumer.php @@ -87,21 +87,29 @@ public function consume(int $timeout = 0): void $queueNames = []; foreach (array_keys($this->subscribers) as $queueName) { - $queueNames[] = $queueName; + $queueNames[$queueName] = $queueName; } - $queueNames = array_unique($queueNames); $timeout /= 1000; $now = time(); $redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds + $currentQueueNames = []; + $queueConsumed = false; while (true) { + if (empty($currentQueueNames)) { + $currentQueueNames = $queueNames; + $queueConsumed = false; + } + $this->removeExpiredMessages(); $this->redeliverMessages(); - if ($message = $this->fetchMessage($queueNames, $redeliveryDelay)) { + if ($message = $this->fetchMessage($currentQueueNames, $redeliveryDelay)) { + $queueConsumed = true; + /** - * @var DbalConsumer + * @var DbalConsumer $consumer * @var callable $callback */ [$consumer, $callback] = $this->subscribers[$message->getQueue()]; @@ -110,12 +118,13 @@ public function consume(int $timeout = 0): void return; } - $queueNames = array_filter($queueNames, static function ($queueName) use ($message) { - return $message->getQueue() !== $queueName; - }); - $queueNames[] = $message->getQueue(); + unset($currentQueueNames[$message->getQueue()]); } else { - usleep($this->getPollingInterval() * 1000); + $currentQueueNames = []; + + if (!$queueConsumed) { + usleep($this->getPollingInterval() * 1000); + } } if ($timeout && microtime(true) >= $now + $timeout) {