Skip to content

Commit ebb9ca2

Browse files
Add handling for delayed message to redis transport
1 parent b0c2112 commit ebb9ca2

File tree

3 files changed

+71
-16
lines changed

3 files changed

+71
-16
lines changed

src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ protected function setUp(): void
3131

3232
$this->redis = new \Redis();
3333
$this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis);
34-
$this->clearRedis();
34+
$this->connection->cleanup();
3535
$this->connection->setup();
3636
}
3737

@@ -55,11 +55,14 @@ public function testGetTheFirstAvailableMessage()
5555
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
5656
}
5757

58-
private function clearRedis()
58+
public function testConnectionSendAndGetDelayed()
5959
{
60-
$parsedUrl = parse_url(getenv('MESSENGER_REDIS_DSN'));
61-
$pathParts = explode('/', $parsedUrl['path'] ?? '');
62-
$stream = $pathParts[1] ?? 'symfony';
63-
$this->redis->del($stream);
60+
$this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class], 1500);
61+
$encoded = $this->connection->get();
62+
$this->assertEquals(null, $encoded);
63+
sleep(2);
64+
$encoded = $this->connection->get();
65+
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
66+
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
6467
}
6568
}

src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class Connection
3636

3737
private $connection;
3838
private $stream;
39+
private $queue;
3940
private $group;
4041
private $consumer;
4142
private $autoSetup;
@@ -59,6 +60,7 @@ public function __construct(array $configuration, array $connectionCredentials =
5960
$this->stream = $configuration['stream'] ?? self::DEFAULT_OPTIONS['stream'];
6061
$this->group = $configuration['group'] ?? self::DEFAULT_OPTIONS['group'];
6162
$this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer'];
63+
$this->queue = $this->stream.'__queue';
6264
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
6365
$this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries'];
6466
}
@@ -112,6 +114,24 @@ public function get(): ?array
112114
$this->setup();
113115
}
114116

117+
$queuedMessageCount = $this->connection->zcount($this->queue, 0, $this->getCurrentTimeInMilliseconds());
118+
119+
if ($queuedMessageCount) {
120+
for ($i = 0; $i < $queuedMessageCount; ++$i) {
121+
foreach ($this->connection->zpopmin($this->queue, 1) as $queuedMessage => $time) {
122+
$queuedMessage = json_decode($queuedMessage, true);
123+
// if a futured placed message is actually popped because of a race condition with
124+
// another running message consumer, the message is readded to the queue by add function
125+
// else its just added stream and will be available for all stream consumers
126+
$this->add(
127+
$queuedMessage['body'],
128+
$queuedMessage['headers'],
129+
$time - $this->getCurrentTimeInMilliseconds()
130+
);
131+
}
132+
}
133+
}
134+
115135
$messageId = '>'; // will receive new messages
116136

117137
if ($this->couldHavePendingMessages) {
@@ -191,24 +211,37 @@ public function reject(string $id): void
191211
}
192212
}
193213

194-
public function add(string $body, array $headers): void
214+
/**
215+
* @param int $delay The delay in milliseconds
216+
*/
217+
public function add(string $body, array $headers, int $delay = 0): void
195218
{
196219
if ($this->autoSetup) {
197220
$this->setup();
198221
}
199222

200223
try {
201-
if ($this->maxEntries) {
202-
$added = $this->connection->xadd($this->stream, '*', ['message' => json_encode(
203-
['body' => $body, 'headers' => $headers]
204-
)], $this->maxEntries, true);
224+
$message = json_encode([
225+
'body' => $body,
226+
'headers' => $headers,
227+
'uniqid' => uniqid('', true), // Entry need to be unique in the sorted set
228+
]);
229+
230+
if ($delay > 0) { // the delay could be smaller 0 in a queued message
231+
$score = (int) ($this->getCurrentTimeInMilliseconds() + $delay);
232+
$added = $this->connection->zadd($this->queue, ['NX'], $score, $message);
205233
} else {
206-
$added = $this->connection->xadd($this->stream, '*', ['message' => json_encode(
207-
['body' => $body, 'headers' => $headers]
208-
)]);
234+
if ($this->maxEntries) {
235+
$added = $this->connection->xadd($this->stream, '*', ['message' => $message], $this->maxEntries, true);
236+
} else {
237+
$added = $this->connection->xadd($this->stream, '*', ['message' => $message]);
238+
}
209239
}
210240
} catch (\RedisException $e) {
211-
throw new TransportException($e->getMessage(), 0, $e);
241+
if ($error = $this->connection->getLastError() ?: null) {
242+
$this->connection->clearLastError();
243+
}
244+
throw new TransportException($error ?? $e->getMessage(), 0, $e);
212245
}
213246

214247
if (!$added) {
@@ -234,4 +267,18 @@ public function setup(): void
234267

235268
$this->autoSetup = false;
236269
}
270+
271+
private function getCurrentTimeInMilliseconds(): int
272+
{
273+
return (int) (microtime(true) * 1000);
274+
}
275+
276+
/**
277+
* @internal
278+
*/
279+
public function cleanup()
280+
{
281+
$this->connection->del($this->stream);
282+
$this->connection->del($this->queue);
283+
}
237284
}

src/Symfony/Component/Messenger/Transport/RedisExt/RedisSender.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Transport\RedisExt;
1313

1414
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Stamp\DelayStamp;
1516
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
1617
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
1718

@@ -37,7 +38,11 @@ public function send(Envelope $envelope): Envelope
3738
{
3839
$encodedMessage = $this->serializer->encode($envelope);
3940

40-
$this->connection->add($encodedMessage['body'], $encodedMessage['headers'] ?? []);
41+
/** @var DelayStamp|null $delayStamp */
42+
$delayStamp = $envelope->last(DelayStamp::class);
43+
$delay = null !== $delayStamp ? $delayStamp->getDelay() : 0;
44+
45+
$this->connection->add($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay);
4146

4247
return $envelope;
4348
}

0 commit comments

Comments
 (0)