Skip to content

Commit 05793ed

Browse files
committed
add fallback subscription consumer, add subscription consumer support to queue consumer.
1 parent 7d417dc commit 05793ed

File tree

3 files changed

+467
-3
lines changed

3 files changed

+467
-3
lines changed
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
<?php
2+
3+
namespace Enqueue\Consumption;
4+
5+
use Interop\Queue\PsrConsumer;
6+
use Interop\Queue\PsrSubscriptionConsumer;
7+
8+
final class FallbackSubscriptionConsumer implements PsrSubscriptionConsumer
9+
{
10+
/**
11+
* an item contains an array: [PsrConsumer $consumer, callable $callback];.
12+
* an item key is a queue name.
13+
*
14+
* @var array
15+
*/
16+
private $subscribers;
17+
18+
/**
19+
* @var int|float the time in milliseconds the consumer waits if no message has been received
20+
*/
21+
private $idleTime = 0;
22+
23+
public function __construct()
24+
{
25+
$this->subscribers = [];
26+
}
27+
28+
/**
29+
* {@inheritdoc}
30+
*/
31+
public function consume($timeout = 0)
32+
{
33+
if (empty($this->subscribers)) {
34+
throw new \LogicException('No subscribers');
35+
}
36+
37+
$timeout /= 1000;
38+
$endAt = microtime(true) + $timeout;
39+
40+
while (true) {
41+
/**
42+
* @var string
43+
* @var PsrConsumer $consumer
44+
* @var callable $processor
45+
*/
46+
foreach ($this->subscribers as $queueName => list($consumer, $callback)) {
47+
$message = $consumer->receiveNoWait();
48+
49+
if ($message) {
50+
if (false === call_user_func($callback, $message, $consumer)) {
51+
return;
52+
}
53+
} else {
54+
if ($timeout && microtime(true) >= $endAt) {
55+
return;
56+
}
57+
58+
$this->idleTime && usleep($this->idleTime);
59+
}
60+
61+
if ($timeout && microtime(true) >= $endAt) {
62+
return;
63+
}
64+
}
65+
}
66+
}
67+
68+
/**
69+
* {@inheritdoc}
70+
*/
71+
public function subscribe(PsrConsumer $consumer, callable $callback)
72+
{
73+
$queueName = $consumer->getQueue()->getQueueName();
74+
if (array_key_exists($queueName, $this->subscribers)) {
75+
if ($this->subscribers[$queueName][0] === $consumer && $this->subscribers[$queueName][1] === $callback) {
76+
return;
77+
}
78+
79+
throw new \InvalidArgumentException(sprintf('There is a consumer subscribed to queue: "%s"', $queueName));
80+
}
81+
82+
$this->subscribers[$queueName] = [$consumer, $callback];
83+
}
84+
85+
/**
86+
* {@inheritdoc}
87+
*/
88+
public function unsubscribe(PsrConsumer $consumer)
89+
{
90+
if (false == array_key_exists($consumer->getQueue()->getQueueName(), $this->subscribers)) {
91+
return;
92+
}
93+
94+
if ($this->subscribers[$consumer->getQueue()->getQueueName()][0] !== $consumer) {
95+
return;
96+
}
97+
98+
unset($this->subscribers[$consumer->getQueue()->getQueueName()]);
99+
}
100+
101+
/**
102+
* {@inheritdoc}
103+
*/
104+
public function unsubscribeAll()
105+
{
106+
$this->subscribers = [];
107+
}
108+
109+
/**
110+
* @return float|int
111+
*/
112+
public function getIdleTime()
113+
{
114+
return $this->idleTime;
115+
}
116+
117+
/**
118+
* @param float|int $idleTime
119+
*/
120+
public function setIdleTime($idleTime)
121+
{
122+
$this->idleTime = $idleTime;
123+
}
124+
}

pkg/enqueue/Consumption/QueueConsumer.php

Lines changed: 76 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Interop\Queue\PsrMessage;
1515
use Interop\Queue\PsrProcessor;
1616
use Interop\Queue\PsrQueue;
17+
use Interop\Queue\PsrSubscriptionConsumerAwareContext;
1718
use Psr\Log\LoggerInterface;
1819
use Psr\Log\NullLogger;
1920

@@ -58,6 +59,13 @@ class QueueConsumer
5859
*/
5960
private $logger;
6061

62+
/**
63+
* @deprecated added as BC layer, will be a default behavior in 0.9 version.
64+
*
65+
* @var bool
66+
*/
67+
private $enableSubscriptionConsumer;
68+
6169
/**
6270
* @param PsrContext $psrContext
6371
* @param ExtensionInterface|ChainExtension|null $extension
@@ -77,6 +85,8 @@ public function __construct(
7785

7886
$this->boundProcessors = [];
7987
$this->logger = new NullLogger();
88+
89+
$this->enableSubscriptionConsumer = false;
8090
}
8191

8292
/**
@@ -187,7 +197,45 @@ public function consume(ExtensionInterface $runtimeExtension = null)
187197

188198
$this->logger->info('Start consuming');
189199

190-
if ($this->psrContext instanceof AmqpContext) {
200+
$subscriptionConsumer = null;
201+
if ($this->enableSubscriptionConsumer) {
202+
$subscriptionConsumer = new FallbackSubscriptionConsumer();
203+
if ($context instanceof PsrSubscriptionConsumerAwareContext) {
204+
$subscriptionConsumer = $context->createSubscriptionConsumer();
205+
}
206+
207+
$callback = function (PsrMessage $message, PsrConsumer $consumer) use (&$context) {
208+
$currentProcessor = null;
209+
210+
/** @var PsrQueue $queue */
211+
foreach ($this->boundProcessors as list($queue, $processor)) {
212+
if ($queue->getQueueName() === $consumer->getQueue()->getQueueName()) {
213+
$currentProcessor = $processor;
214+
}
215+
}
216+
217+
if (false == $currentProcessor) {
218+
throw new \LogicException(sprintf('The processor for the queue "%s" could not be found.', $consumer->getQueue()->getQueueName()));
219+
}
220+
221+
$context = new Context($this->psrContext);
222+
$context->setLogger($this->logger);
223+
$context->setPsrQueue($consumer->getQueue());
224+
$context->setPsrConsumer($consumer);
225+
$context->setPsrProcessor($currentProcessor);
226+
$context->setPsrMessage($message);
227+
228+
$this->doConsume($this->extension, $context);
229+
230+
return true;
231+
};
232+
233+
foreach ($consumers as $consumer) {
234+
/* @var AmqpConsumer $consumer */
235+
236+
$subscriptionConsumer->subscribe($consumer, $callback);
237+
}
238+
} elseif ($this->psrContext instanceof AmqpContext) {
191239
$callback = function (AmqpMessage $message, AmqpConsumer $consumer) use (&$context) {
192240
$currentProcessor = null;
193241

@@ -223,7 +271,18 @@ public function consume(ExtensionInterface $runtimeExtension = null)
223271

224272
while (true) {
225273
try {
226-
if ($this->psrContext instanceof AmqpContext) {
274+
if ($this->enableSubscriptionConsumer) {
275+
$this->extension->onBeforeReceive($context);
276+
277+
if ($context->isExecutionInterrupted()) {
278+
throw new ConsumptionInterruptedException();
279+
}
280+
281+
$subscriptionConsumer->consume($this->receiveTimeout);
282+
283+
usleep($this->idleTimeout * 1000);
284+
$this->extension->onIdle($context);
285+
} elseif ($this->psrContext instanceof AmqpContext) {
227286
$this->extension->onBeforeReceive($context);
228287

229288
if ($context->isExecutionInterrupted()) {
@@ -251,7 +310,13 @@ public function consume(ExtensionInterface $runtimeExtension = null)
251310
} catch (ConsumptionInterruptedException $e) {
252311
$this->logger->info(sprintf('Consuming interrupted'));
253312

254-
if ($this->psrContext instanceof AmqpContext) {
313+
if ($this->enableSubscriptionConsumer) {
314+
foreach ($consumers as $consumer) {
315+
/* @var PsrConsumer $consumer */
316+
317+
$subscriptionConsumer->unsubscribe($consumer);
318+
}
319+
} elseif ($this->psrContext instanceof AmqpContext) {
255320
foreach ($consumers as $consumer) {
256321
/* @var AmqpConsumer $consumer */
257322

@@ -279,6 +344,14 @@ public function consume(ExtensionInterface $runtimeExtension = null)
279344
}
280345
}
281346

347+
/**
348+
* @param bool $enableSubscriptionConsumer
349+
*/
350+
public function enableSubscriptionConsumer(bool $enableSubscriptionConsumer)
351+
{
352+
$this->enableSubscriptionConsumer = $enableSubscriptionConsumer;
353+
}
354+
282355
/**
283356
* @param ExtensionInterface $extension
284357
* @param Context $context

0 commit comments

Comments
 (0)