Skip to content

Commit e0bf5fe

Browse files
authored
Merge pull request #5897 from magento-performance/MC-35827-2
Fixed issues: - MC-35884 Consumers improvement - MC-35827 Test and merge PR with extended message queue consumer configuration
2 parents 1e64fa5 + 3d9136d commit e0bf5fe

26 files changed

+788
-60
lines changed

app/code/Magento/AsynchronousOperations/Model/MassConsumer.php

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,19 @@ public function process($maxNumberOfMessages = null)
6969
$this->registry->register('isSecureArea', true, true);
7070

7171
$queue = $this->configuration->getQueue();
72+
$maxIdleTime = $this->configuration->getMaxIdleTime();
73+
$sleep = $this->configuration->getSleep();
7274

7375
if (!isset($maxNumberOfMessages)) {
7476
$queue->subscribe($this->getTransactionCallback($queue));
7577
} else {
76-
$this->invoker->invoke($queue, $maxNumberOfMessages, $this->getTransactionCallback($queue));
78+
$this->invoker->invoke(
79+
$queue,
80+
$maxNumberOfMessages,
81+
$this->getTransactionCallback($queue),
82+
$maxIdleTime,
83+
$sleep
84+
);
7785
}
7886

7987
$this->registry->unregister('isSecureArea');
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?php
2+
/**
3+
* Copyright © Magento, Inc. All rights reserved.
4+
* See COPYING.txt for license details.
5+
*/
6+
declare(strict_types=1);
7+
8+
namespace Magento\MessageQueue\Model;
9+
10+
use Magento\Framework\MessageQueue\QueueRepository;
11+
12+
/**
13+
* Class CheckIsAvailableMessagesInQueue for checking messages available in queue
14+
*/
15+
class CheckIsAvailableMessagesInQueue
16+
{
17+
/**
18+
* @var QueueRepository
19+
*/
20+
private $queueRepository;
21+
22+
/**
23+
* Initialize dependencies.
24+
*
25+
* @param QueueRepository $queueRepository
26+
*/
27+
public function __construct(QueueRepository $queueRepository)
28+
{
29+
$this->queueRepository = $queueRepository;
30+
}
31+
32+
/**
33+
* Checks if there is available messages in the queue
34+
*
35+
* @param string $connectionName connection name
36+
* @param string $queueName queue name
37+
* @return bool
38+
* @throws \LogicException if queue is not available
39+
*/
40+
public function execute($connectionName, $queueName)
41+
{
42+
$queue = $this->queueRepository->get($connectionName, $queueName);
43+
$message = $queue->dequeue();
44+
if ($message) {
45+
$queue->reject($message);
46+
return true;
47+
}
48+
return false;
49+
}
50+
}

app/code/Magento/MessageQueue/Model/Cron/ConsumersRunner.php

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Psr\Log\LoggerInterface;
1515
use Symfony\Component\Process\PhpExecutableFinder;
1616
use Magento\Framework\Lock\LockManagerInterface;
17+
use Magento\MessageQueue\Model\CheckIsAvailableMessagesInQueue;
1718

1819
/**
1920
* Class for running consumers processes by cron
@@ -65,6 +66,11 @@ class ConsumersRunner
6566
*/
6667
private $lockManager;
6768

69+
/**
70+
* @var CheckIsAvailableMessagesInQueue
71+
*/
72+
private $checkIsAvailableMessages;
73+
6874
/**
6975
* @param PhpExecutableFinder $phpExecutableFinder The executable finder specifically designed
7076
* for the PHP executable
@@ -74,6 +80,7 @@ class ConsumersRunner
7480
* @param LockManagerInterface $lockManager The lock manager
7581
* @param ConnectionTypeResolver $mqConnectionTypeResolver Consumer connection resolver
7682
* @param LoggerInterface $logger Logger
83+
* @param CheckIsAvailableMessagesInQueue $checkIsAvailableMessages
7784
*/
7885
public function __construct(
7986
PhpExecutableFinder $phpExecutableFinder,
@@ -82,7 +89,8 @@ public function __construct(
8289
ShellInterface $shellBackground,
8390
LockManagerInterface $lockManager,
8491
ConnectionTypeResolver $mqConnectionTypeResolver = null,
85-
LoggerInterface $logger = null
92+
LoggerInterface $logger = null,
93+
CheckIsAvailableMessagesInQueue $checkIsAvailableMessages = null
8694
) {
8795
$this->phpExecutableFinder = $phpExecutableFinder;
8896
$this->consumerConfig = $consumerConfig;
@@ -93,6 +101,8 @@ public function __construct(
93101
?: ObjectManager::getInstance()->get(ConnectionTypeResolver::class);
94102
$this->logger = $logger
95103
?: ObjectManager::getInstance()->get(LoggerInterface::class);
104+
$this->checkIsAvailableMessages = $checkIsAvailableMessages
105+
?: ObjectManager::getInstance()->get(CheckIsAvailableMessagesInQueue::class);
96106
}
97107

98108
/**
@@ -166,6 +176,25 @@ private function canBeRun(ConsumerConfigItemInterface $consumerConfig, array $al
166176
return false;
167177
}
168178

179+
if ($consumerConfig->getOnlySpawnWhenMessageAvailable()) {
180+
try {
181+
return $this->checkIsAvailableMessages->execute(
182+
$connectionName,
183+
$consumerConfig->getQueue()
184+
);
185+
} catch (\LogicException $e) {
186+
$this->logger->info(
187+
sprintf(
188+
'Consumer "%s" skipped as its related queue "%s" is not available. %s',
189+
$consumerName,
190+
$consumerConfig->getQueue(),
191+
$e->getMessage()
192+
)
193+
);
194+
return false;
195+
}
196+
}
197+
169198
return true;
170199
}
171200
}

app/code/Magento/MessageQueue/Test/Unit/Model/Cron/ConsumersRunnerTest.php

Lines changed: 104 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Magento\Framework\MessageQueue\Consumer\ConfigInterface as ConsumerConfigInterface;
1515
use Magento\Framework\ShellInterface;
1616
use Magento\MessageQueue\Model\Cron\ConsumersRunner;
17+
use Magento\MessageQueue\Model\CheckIsAvailableMessagesInQueue;
1718
use PHPUnit\Framework\MockObject\MockObject;
1819
use PHPUnit\Framework\TestCase;
1920
use Symfony\Component\Process\PhpExecutableFinder;
@@ -48,10 +49,15 @@ class ConsumersRunnerTest extends TestCase
4849
*/
4950
private $phpExecutableFinderMock;
5051

52+
/**
53+
* @var CheckIsAvailableMessagesInQueue|MockObject
54+
*/
55+
private $checkIsAvailableMessagesMock;
56+
5157
/**
5258
* @var ConnectionTypeResolver
5359
*/
54-
private $connectionTypeResover;
60+
private $connectionTypeResolver;
5561

5662
/**
5763
* @var ConsumersRunner
@@ -77,18 +83,21 @@ protected function setUp(): void
7783
$this->deploymentConfigMock = $this->getMockBuilder(DeploymentConfig::class)
7884
->disableOriginalConstructor()
7985
->getMock();
80-
$this->connectionTypeResover = $this->getMockBuilder(ConnectionTypeResolver::class)
86+
$this->checkIsAvailableMessagesMock = $this->createMock(CheckIsAvailableMessagesInQueue::class);
87+
$this->connectionTypeResolver = $this->getMockBuilder(ConnectionTypeResolver::class)
8188
->disableOriginalConstructor()
8289
->getMock();
83-
$this->connectionTypeResover->method('getConnectionType')->willReturn('something');
90+
$this->connectionTypeResolver->method('getConnectionType')->willReturn('something');
8491

8592
$this->consumersRunner = new ConsumersRunner(
8693
$this->phpExecutableFinderMock,
8794
$this->consumerConfigMock,
8895
$this->deploymentConfigMock,
8996
$this->shellBackgroundMock,
9097
$this->lockManagerMock,
91-
$this->connectionTypeResover
98+
$this->connectionTypeResolver,
99+
null,
100+
$this->checkIsAvailableMessagesMock
92101
);
93102
}
94103

@@ -262,4 +271,95 @@ public function runDataProvider()
262271
],
263272
];
264273
}
274+
275+
/**
276+
* @param boolean $onlySpawnWhenMessageAvailable
277+
* @param boolean $isMassagesAvailableInTheQueue
278+
* @param int $shellBackgroundExpects
279+
* @dataProvider runBasedOnOnlySpawnWhenMessageAvailableConsumerConfigurationDataProvider
280+
*/
281+
public function testRunBasedOnOnlySpawnWhenMessageAvailableConsumerConfiguration(
282+
$onlySpawnWhenMessageAvailable,
283+
$isMassagesAvailableInTheQueue,
284+
$shellBackgroundExpects
285+
) {
286+
$consumerName = 'consumerName';
287+
$connectionName = 'connectionName';
288+
$queueName = 'queueName';
289+
$this->deploymentConfigMock->expects($this->exactly(3))
290+
->method('get')
291+
->willReturnMap(
292+
[
293+
['cron_consumers_runner/cron_run', true, true],
294+
['cron_consumers_runner/max_messages', 10000, 1000],
295+
['cron_consumers_runner/consumers', [], []],
296+
]
297+
);
298+
299+
/** @var ConsumerConfigInterface|MockObject $firstCunsumer */
300+
$consumer = $this->getMockBuilder(ConsumerConfigItemInterface::class)
301+
->getMockForAbstractClass();
302+
$consumer->expects($this->any())
303+
->method('getName')
304+
->willReturn($consumerName);
305+
$consumer->expects($this->once())
306+
->method('getConnection')
307+
->willReturn($connectionName);
308+
$consumer->expects($this->any())
309+
->method('getQueue')
310+
->willReturn($queueName);
311+
$consumer->expects($this->once())
312+
->method('getOnlySpawnWhenMessageAvailable')
313+
->willReturn($onlySpawnWhenMessageAvailable);
314+
$this->consumerConfigMock->expects($this->once())
315+
->method('getConsumers')
316+
->willReturn([$consumer]);
317+
318+
$this->phpExecutableFinderMock->expects($this->once())
319+
->method('find')
320+
->willReturn('');
321+
322+
$this->lockManagerMock->expects($this->once())
323+
->method('isLocked')
324+
->willReturn(false);
325+
326+
$this->checkIsAvailableMessagesMock->expects($this->exactly((int)$onlySpawnWhenMessageAvailable))
327+
->method('execute')
328+
->willReturn($isMassagesAvailableInTheQueue);
329+
330+
$this->shellBackgroundMock->expects($this->exactly($shellBackgroundExpects))
331+
->method('execute');
332+
333+
$this->consumersRunner->run();
334+
}
335+
336+
/**
337+
* @return array
338+
*/
339+
public function runBasedOnOnlySpawnWhenMessageAvailableConsumerConfigurationDataProvider()
340+
{
341+
return [
342+
[
343+
'onlySpawnWhenMessageAvailable' => true,
344+
'isMassagesAvailableInTheQueue' => true,
345+
'shellBackgroundExpects' => 1
346+
],
347+
[
348+
'onlySpawnWhenMessageAvailable' => true,
349+
'isMassagesAvailableInTheQueue' => false,
350+
'shellBackgroundExpects' => 0
351+
],
352+
[
353+
'onlySpawnWhenMessageAvailable' => false,
354+
'isMassagesAvailableInTheQueue' => true,
355+
'shellBackgroundExpects' => 1
356+
],
357+
[
358+
'onlySpawnWhenMessageAvailable' => false,
359+
'isMassagesAvailableInTheQueue' => false,
360+
'shellBackgroundExpects' => 1
361+
],
362+
363+
];
364+
}
265365
}

lib/internal/Magento/Framework/MessageQueue/CallbackInvoker.php

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,31 @@ public function __construct(
5656
* @param QueueInterface $queue
5757
* @param int $maxNumberOfMessages
5858
* @param \Closure $callback
59+
* @param int|null $maxIdleTime
60+
* @param int|null $sleep
5961
* @return void
62+
*
63+
* @SuppressWarnings(PHPMD.CyclomaticComplexity)
6064
*/
61-
public function invoke(QueueInterface $queue, $maxNumberOfMessages, $callback)
62-
{
65+
public function invoke(
66+
QueueInterface $queue,
67+
$maxNumberOfMessages,
68+
$callback,
69+
$maxIdleTime = null,
70+
$sleep = null
71+
) {
6372
$this->poisonPillVersion = $this->poisonPillRead->getLatestVersion();
73+
$sleep = (int) $sleep ?: 1;
74+
$maxIdleTime = $maxIdleTime ? (int) $maxIdleTime : PHP_INT_MAX;
6475
for ($i = $maxNumberOfMessages; $i > 0; $i--) {
76+
$idleStartTime = microtime(true);
6577
do {
6678
$message = $queue->dequeue();
79+
if (!$message && microtime(true) - $idleStartTime > $maxIdleTime) {
80+
break 2;
81+
}
6782
// phpcs:ignore Magento2.Functions.DiscouragedFunction
68-
} while ($message === null && $this->isWaitingNextMessage() && (sleep(1) === 0));
83+
} while ($message === null && $this->isWaitingNextMessage() && (sleep($sleep) === 0));
6984

7085
if ($message === null) {
7186
break;

lib/internal/Magento/Framework/MessageQueue/CallbackInvokerInterface.php

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
namespace Magento\Framework\MessageQueue;
99

1010
/**
11-
* Callback invoker interface
11+
* Callback invoker interface. Invoke callbacks for consumer classes.
1212
*/
1313
interface CallbackInvokerInterface
1414
{
@@ -18,7 +18,15 @@ interface CallbackInvokerInterface
1818
* @param QueueInterface $queue
1919
* @param int $maxNumberOfMessages
2020
* @param \Closure $callback
21+
* @param int|null $maxIdleTime
22+
* @param int|null $sleep
2123
* @return void
2224
*/
23-
public function invoke(QueueInterface $queue, $maxNumberOfMessages, $callback);
25+
public function invoke(
26+
QueueInterface $queue,
27+
$maxNumberOfMessages,
28+
$callback,
29+
$maxIdleTime = null,
30+
$sleep = null
31+
);
2432
}

lib/internal/Magento/Framework/MessageQueue/Config/Consumer/ConfigReaderPlugin.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,10 @@ private function getConsumerConfigDataFromQueueConfig()
6868
'consumerInstance' => $consumerData['instance_type'],
6969
'handlers' => $handlers,
7070
'connection' => $consumerData['connection'],
71-
'maxMessages' => $consumerData['max_messages']
71+
'maxMessages' => $consumerData['max_messages'],
72+
'maxIdleTime' => null,
73+
'sleep' => null,
74+
'onlySpawnWhenMessageAvailable' => false
7275
];
7376
}
7477

lib/internal/Magento/Framework/MessageQueue/Consumer.php

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,18 @@ public function __construct(
108108
public function process($maxNumberOfMessages = null)
109109
{
110110
$queue = $this->configuration->getQueue();
111-
111+
$maxIdleTime = $this->configuration->getMaxIdleTime();
112+
$sleep = $this->configuration->getSleep();
112113
if (!isset($maxNumberOfMessages)) {
113114
$queue->subscribe($this->getTransactionCallback($queue));
114115
} else {
115-
$this->invoker->invoke($queue, $maxNumberOfMessages, $this->getTransactionCallback($queue));
116+
$this->invoker->invoke(
117+
$queue,
118+
$maxNumberOfMessages,
119+
$this->getTransactionCallback($queue),
120+
$maxIdleTime,
121+
$sleep
122+
);
116123
}
117124
}
118125

0 commit comments

Comments
 (0)