Skip to content

Commit 811c2be

Browse files
committed
Dbal Subscription Consumer feature
1 parent adb5e10 commit 811c2be

9 files changed

+534
-24
lines changed

pkg/dbal/DbalConsumer.php

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ protected function receiveMessage(): ?DbalMessage
142142
$this->dbal->commit();
143143

144144
if (empty($dbalMessage['time_to_live']) || ($dbalMessage['time_to_live'] / 1000) > microtime(true)) {
145-
return $this->convertMessage($dbalMessage);
145+
return DbalMessage::fromArray($dbalMessage);
146146
}
147147

148148
return null;
@@ -153,27 +153,6 @@ protected function receiveMessage(): ?DbalMessage
153153
}
154154
}
155155

156-
protected function convertMessage(array $dbalMessage): DbalMessage
157-
{
158-
/** @var DbalMessage $message */
159-
$message = $this->context->createMessage();
160-
161-
$message->setBody($dbalMessage['body']);
162-
$message->setPriority((int) $dbalMessage['priority']);
163-
$message->setRedelivered((bool) $dbalMessage['redelivered']);
164-
$message->setPublishedAt((int) $dbalMessage['published_at']);
165-
166-
if ($dbalMessage['headers']) {
167-
$message->setHeaders(JSON::decode($dbalMessage['headers']));
168-
}
169-
170-
if ($dbalMessage['properties']) {
171-
$message->setProperties(JSON::decode($dbalMessage['properties']));
172-
}
173-
174-
return $message;
175-
}
176-
177156
private function fetchPrioritizedMessage(int $now): ?array
178157
{
179158
$query = $this->dbal->createQueryBuilder();

pkg/dbal/DbalContext.php

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
use Interop\Queue\Context;
1212
use Interop\Queue\Destination;
1313
use Interop\Queue\Exception\InvalidDestinationException;
14-
use Interop\Queue\Exception\SubscriptionConsumerNotSupportedException;
1514
use Interop\Queue\Exception\TemporaryQueueNotSupportedException;
1615
use Interop\Queue\Message;
1716
use Interop\Queue\Producer;
@@ -126,7 +125,7 @@ public function close(): void
126125

127126
public function createSubscriptionConsumer(): SubscriptionConsumer
128127
{
129-
throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt();
128+
return new DbalSubscriptionConsumer($this);
130129
}
131130

132131
/**

pkg/dbal/DbalMessage.php

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,27 @@ public function __construct(string $body = '', array $properties = [], array $he
6767
$this->deliveryDelay = null;
6868
}
6969

70+
public static function fromArray(array $dbalMessage): self
71+
{
72+
$dbalMessageObj = new self(
73+
$dbalMessage['body'],
74+
$dbalMessage['properties'] ? JSON::decode($dbalMessage['properties']) : [],
75+
$dbalMessage['headers'] ? JSON::decode($dbalMessage['headers']) : []
76+
);
77+
78+
if (isset($dbalMessage['redelivered'])) {
79+
$dbalMessageObj->setRedelivered((bool) $dbalMessage['redelivered']);
80+
}
81+
if (isset($dbalMessage['priority'])) {
82+
$dbalMessageObj->setPriority((int) $dbalMessage['priority']);
83+
}
84+
if (isset($dbalMessage['published_at'])) {
85+
$dbalMessageObj->setPublishedAt((int) $dbalMessage['published_at']);
86+
}
87+
88+
return $dbalMessageObj;
89+
}
90+
7091
public function setBody(string $body): void
7192
{
7293
$this->body = $body;

pkg/dbal/DbalSubscriptionConsumer.php

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\Dbal;
6+
7+
use Doctrine\DBAL\Types\Type;
8+
use Interop\Queue\Consumer;
9+
use Interop\Queue\SubscriptionConsumer;
10+
11+
class DbalSubscriptionConsumer implements SubscriptionConsumer
12+
{
13+
/**
14+
* @var DbalContext
15+
*/
16+
private $context;
17+
18+
/**
19+
* an item contains an array: [DbalConsumer $consumer, callable $callback];.
20+
*
21+
* @var array
22+
*/
23+
private $subscribers;
24+
25+
/**
26+
* @var \Doctrine\DBAL\Connection
27+
*/
28+
private $dbal;
29+
30+
/**
31+
* @param DbalContext $context
32+
*/
33+
public function __construct(DbalContext $context)
34+
{
35+
$this->context = $context;
36+
$this->dbal = $this->context->getDbalConnection();
37+
$this->subscribers = [];
38+
}
39+
40+
public function consume(int $timeout = 0): void
41+
{
42+
if (empty($this->subscribers)) {
43+
throw new \LogicException('No subscribers');
44+
}
45+
46+
$timeout = (int) ceil($timeout / 1000);
47+
$endAt = time() + $timeout;
48+
49+
$queueNames = [];
50+
foreach (array_keys($this->subscribers) as $queueName) {
51+
$queueNames[$queueName] = $queueName;
52+
}
53+
54+
$currentQueueNames = [];
55+
while (true) {
56+
if (empty($currentQueueNames)) {
57+
$currentQueueNames = $queueNames;
58+
}
59+
60+
$message = $this->fetchPrioritizedMessage($currentQueueNames) ?: $this->fetchMessage($currentQueueNames);
61+
62+
if ($message) {
63+
$this->dbal->delete($this->context->getTableName(), ['id' => $message['id']], ['id' => Type::GUID]);
64+
65+
$dbalMessage = DbalMessage::fromArray($message);
66+
67+
/**
68+
* @var DbalConsumer
69+
* @var callable $callback
70+
*/
71+
list($consumer, $callback) = $this->subscribers[$message['queue']];
72+
73+
if (false === call_user_func($callback, $dbalMessage, $consumer)) {
74+
return;
75+
}
76+
77+
unset($currentQueueNames[$message['queue']]);
78+
} else {
79+
$currentQueueNames = [];
80+
}
81+
82+
if ($timeout && microtime(true) >= $endAt) {
83+
return;
84+
}
85+
}
86+
}
87+
88+
/**
89+
* @param DbalConsumer $consumer
90+
*/
91+
public function subscribe(Consumer $consumer, callable $callback): void
92+
{
93+
if (false == $consumer instanceof DbalConsumer) {
94+
throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', DbalConsumer::class, get_class($consumer)));
95+
}
96+
97+
$queueName = $consumer->getQueue()->getQueueName();
98+
if (array_key_exists($queueName, $this->subscribers)) {
99+
if ($this->subscribers[$queueName][0] === $consumer && $this->subscribers[$queueName][1] === $callback) {
100+
return;
101+
}
102+
103+
throw new \InvalidArgumentException(sprintf('There is a consumer subscribed to queue: "%s"', $queueName));
104+
}
105+
106+
$this->subscribers[$queueName] = [$consumer, $callback];
107+
}
108+
109+
/**
110+
* @param DbalConsumer $consumer
111+
*/
112+
public function unsubscribe(Consumer $consumer): void
113+
{
114+
if (false == $consumer instanceof DbalConsumer) {
115+
throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', DbalConsumer::class, get_class($consumer)));
116+
}
117+
118+
$queueName = $consumer->getQueue()->getQueueName();
119+
120+
if (false == array_key_exists($queueName, $this->subscribers)) {
121+
return;
122+
}
123+
124+
if ($this->subscribers[$queueName][0] !== $consumer) {
125+
return;
126+
}
127+
128+
unset($this->subscribers[$queueName]);
129+
}
130+
131+
public function unsubscribeAll(): void
132+
{
133+
$this->subscribers = [];
134+
}
135+
136+
private function fetchMessage(array $queues): ?array
137+
{
138+
$query = $this->dbal->createQueryBuilder();
139+
$query
140+
->select('*')
141+
->from($this->context->getTableName())
142+
->andWhere('queue IN (:queues)')
143+
->andWhere('priority IS NULL')
144+
->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)')
145+
->addOrderBy('published_at', 'asc')
146+
->setMaxResults(1)
147+
;
148+
149+
$sql = $query->getSQL().' '.$this->dbal->getDatabasePlatform()->getWriteLockSQL();
150+
151+
$result = $this->dbal->executeQuery(
152+
$sql,
153+
[
154+
'queues' => array_keys($queues),
155+
'delayedUntil' => time(),
156+
],
157+
[
158+
'queues' => \Doctrine\DBAL\Connection::PARAM_STR_ARRAY,
159+
'delayedUntil' => \Doctrine\DBAL\ParameterType::INTEGER,
160+
]
161+
)->fetch();
162+
163+
return $result ?: null;
164+
}
165+
166+
private function fetchPrioritizedMessage(array $queues): ?array
167+
{
168+
$query = $this->dbal->createQueryBuilder();
169+
$query
170+
->select('*')
171+
->from($this->context->getTableName())
172+
->andWhere('queue IN (:queues)')
173+
->andWhere('priority IS NOT NULL')
174+
->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)')
175+
->addOrderBy('published_at', 'asc')
176+
->addOrderBy('priority', 'desc')
177+
->setMaxResults(1)
178+
;
179+
180+
$sql = $query->getSQL().' '.$this->dbal->getDatabasePlatform()->getWriteLockSQL();
181+
182+
$result = $this->dbal->executeQuery(
183+
$sql,
184+
[
185+
'queues' => array_keys($queues),
186+
'delayedUntil' => time(),
187+
],
188+
[
189+
'queues' => \Doctrine\DBAL\Connection::PARAM_STR_ARRAY,
190+
'delayedUntil' => \Doctrine\DBAL\ParameterType::INTEGER,
191+
]
192+
)->fetch();
193+
194+
return $result ?: null;
195+
}
196+
}

pkg/dbal/Tests/DbalMessageTest.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
<?php
22

3+
declare(strict_types=1);
4+
35
namespace Enqueue\Dbal\Tests;
46

57
use Enqueue\Dbal\DbalMessage;
@@ -27,6 +29,20 @@ public function testCouldBeConstructedWithOptionalArguments()
2729
$this->assertSame(['fooHeader' => 'fooHeaderVal'], $message->getHeaders());
2830
}
2931

32+
public function testCouldBeCreatedFromArray()
33+
{
34+
$arrayData = [
35+
'body' => 'theBody',
36+
'properties' => json_encode(['barProp' => 'barPropVal']),
37+
'headers' => json_encode(['fooHeader' => 'fooHeaderVal']),
38+
];
39+
$message = DbalMessage::fromArray($arrayData);
40+
41+
$this->assertSame('theBody', $message->getBody());
42+
$this->assertSame(['barProp' => 'barPropVal'], $message->getProperties());
43+
$this->assertSame(['fooHeader' => 'fooHeaderVal'], $message->getHeaders());
44+
}
45+
3046
public function testShouldSetPriorityToNullInConstructor()
3147
{
3248
$message = new DbalMessage();

0 commit comments

Comments
 (0)