Skip to content

Commit 6260028

Browse files
author
Henning
committed
if no queue was consumed, then wait the polling interval. Otherweise continue processing the queues!
1 parent 876c6c9 commit 6260028

File tree

2 files changed

+31
-36
lines changed

2 files changed

+31
-36
lines changed

pkg/dbal/DbalConsumerHelperTrait.php

Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,18 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa
2929

3030
$endAt = microtime(true) + 0.2; // add 200ms
3131

32+
$select = $this->getConnection()->createQueryBuilder()
33+
->select('id')
34+
->from($this->getContext()->getTableName())
35+
->andWhere('queue IN (:queues)')
36+
->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil')
37+
->andWhere('delivery_id IS NULL')
38+
->addOrderBy('priority', 'asc')
39+
->addOrderBy('published_at', 'asc')
40+
->setParameter('queues', $queues, Connection::PARAM_STR_ARRAY)
41+
->setParameter('delayedUntil', $now, DbalType::INTEGER)
42+
->setMaxResults(1);
43+
3244
$update = $this->getConnection()->createQueryBuilder()
3345
->update($this->getContext()->getTableName())
3446
->set('delivery_id', ':deliveryId')
@@ -41,7 +53,7 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa
4153

4254
while (microtime(true) < $endAt) {
4355
try {
44-
$result = $this->getResultByQueueList($queues, $now);
56+
$result = $select->execute()->fetch();
4557
if (empty($result)) {
4658
return null;
4759
}
@@ -143,30 +155,4 @@ private function deleteMessage(string $deliveryId): void
143155
['delivery_id' => DbalType::GUID]
144156
);
145157
}
146-
147-
private function getResultByQueueList(array $queues, int $now): ?array
148-
{
149-
$select = $this->getConnection()->createQueryBuilder()
150-
->select('id')
151-
->from($this->getContext()->getTableName())
152-
->andWhere('queue = :queue')
153-
->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil')
154-
->andWhere('delivery_id IS NULL')
155-
->addOrderBy('priority', 'asc')
156-
->addOrderBy('published_at', 'asc')
157-
->setParameter('delayedUntil', $now, DbalType::INTEGER)
158-
->setMaxResults(1);
159-
160-
foreach ($queues as $queue) {
161-
$select->setParameter('queue', $queue, DbalType::STRING);
162-
163-
$result = $select->execute()->fetch();
164-
165-
if (!empty($result)) {
166-
return $result;
167-
}
168-
}
169-
170-
return null;
171-
}
172158
}

pkg/dbal/DbalSubscriptionConsumer.php

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,21 +87,29 @@ public function consume(int $timeout = 0): void
8787

8888
$queueNames = [];
8989
foreach (array_keys($this->subscribers) as $queueName) {
90-
$queueNames[] = $queueName;
90+
$queueNames[$queueName] = $queueName;
9191
}
92-
$queueNames = array_unique($queueNames);
9392

9493
$timeout /= 1000;
9594
$now = time();
9695
$redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds
9796

97+
$currentQueueNames = [];
98+
$queueConsumed = false;
9899
while (true) {
100+
if (empty($currentQueueNames)) {
101+
$currentQueueNames = $queueNames;
102+
$queueConsumed = false;
103+
}
104+
99105
$this->removeExpiredMessages();
100106
$this->redeliverMessages();
101107

102-
if ($message = $this->fetchMessage($queueNames, $redeliveryDelay)) {
108+
if ($message = $this->fetchMessage($currentQueueNames, $redeliveryDelay)) {
109+
$queueConsumed = true;
110+
103111
/**
104-
* @var DbalConsumer
112+
* @var DbalConsumer $consumer
105113
* @var callable $callback
106114
*/
107115
[$consumer, $callback] = $this->subscribers[$message->getQueue()];
@@ -110,12 +118,13 @@ public function consume(int $timeout = 0): void
110118
return;
111119
}
112120

113-
$queueNames = array_filter($queueNames, static function ($queueName) use ($message) {
114-
return $message->getQueue() !== $queueName;
115-
});
116-
$queueNames[] = $message->getQueue();
121+
unset($currentQueueNames[$message->getQueue()]);
117122
} else {
118-
usleep($this->getPollingInterval() * 1000);
123+
$currentQueueNames = [];
124+
125+
if (!$queueConsumed) {
126+
usleep($this->getPollingInterval() * 1000);
127+
}
119128
}
120129

121130
if ($timeout && microtime(true) >= $now + $timeout) {

0 commit comments

Comments
 (0)