Skip to content

Commit a48a507

Browse files
authored
Merge pull request #4072 from magento-honey-badgers/MC-5588-mysqlmq
[honey] MC-5588: When using MysqlMQ messages are always set to complete even if exception occurs
2 parents 789dc34 + 5e17f74 commit a48a507

File tree

18 files changed

+305
-255
lines changed

18 files changed

+305
-255
lines changed

app/code/Magento/MysqlMq/Model/Driver/Queue.php

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public function __construct(
7373
}
7474

7575
/**
76-
* {@inheritdoc}
76+
* @inheritdoc
7777
*/
7878
public function dequeue()
7979
{
@@ -92,7 +92,7 @@ public function dequeue()
9292
}
9393

9494
/**
95-
* {@inheritdoc}
95+
* @inheritdoc
9696
*/
9797
public function acknowledge(EnvelopeInterface $envelope)
9898
{
@@ -103,25 +103,26 @@ public function acknowledge(EnvelopeInterface $envelope)
103103
}
104104

105105
/**
106-
* {@inheritdoc}
106+
* @inheritdoc
107107
*/
108108
public function subscribe($callback)
109109
{
110110
while (true) {
111111
while ($envelope = $this->dequeue()) {
112112
try {
113+
// phpcs:ignore Magento2.Functions.DiscouragedFunction
113114
call_user_func($callback, $envelope);
114-
$this->acknowledge($envelope);
115115
} catch (\Exception $e) {
116116
$this->reject($envelope);
117117
}
118118
}
119+
// phpcs:ignore Magento2.Functions.DiscouragedFunction
119120
sleep($this->interval);
120121
}
121122
}
122123

123124
/**
124-
* {@inheritdoc}
125+
* @inheritdoc
125126
*/
126127
public function reject(EnvelopeInterface $envelope, $requeue = true, $rejectionMessage = null)
127128
{
@@ -139,7 +140,7 @@ public function reject(EnvelopeInterface $envelope, $requeue = true, $rejectionM
139140
}
140141

141142
/**
142-
* {@inheritDoc}
143+
* @inheritDoc
143144
*/
144145
public function push(EnvelopeInterface $envelope)
145146
{

dev/tests/integration/testsuite/Magento/MysqlMq/Model/DataObject.php renamed to dev/tests/integration/_files/Magento/TestModuleMysqlMq/Model/DataObject.php

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* Copyright © Magento, Inc. All rights reserved.
44
* See COPYING.txt for license details.
55
*/
6-
namespace Magento\MysqlMq\Model;
6+
namespace Magento\TestModuleMysqlMq\Model;
77

88
class DataObject extends \Magento\Framework\Api\AbstractExtensibleObject
99
{
@@ -40,4 +40,21 @@ public function setEntityId($entityId)
4040
{
4141
return $this->setData('entity_id', $entityId);
4242
}
43+
44+
/**
45+
* @return string
46+
*/
47+
public function getOutputPath()
48+
{
49+
return $this->_get('outputPath');
50+
}
51+
52+
/**
53+
* @param string $path
54+
* @return $this
55+
*/
56+
public function setOutputPath($path)
57+
{
58+
return $this->setData('outputPath', $path);
59+
}
4360
}

dev/tests/integration/testsuite/Magento/MysqlMq/Model/DataObjectRepository.php renamed to dev/tests/integration/_files/Magento/TestModuleMysqlMq/Model/DataObjectRepository.php

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,25 @@
33
* Copyright © Magento, Inc. All rights reserved.
44
* See COPYING.txt for license details.
55
*/
6-
namespace Magento\MysqlMq\Model;
6+
namespace Magento\TestModuleMysqlMq\Model;
77

88
class DataObjectRepository
99
{
1010
/**
1111
* @param DataObject $dataObject
1212
* @param string $requiredParam
1313
* @param int|null $optionalParam
14-
* @return bool
14+
* @return null
1515
*/
1616
public function delayedOperation(
17-
\Magento\MysqlMq\Model\DataObject $dataObject,
17+
\Magento\TestModuleMysqlMq\Model\DataObject $dataObject,
1818
$requiredParam,
1919
$optionalParam = null
2020
) {
21-
echo "Processed '{$dataObject->getEntityId()}'; "
21+
$output = "Processed '{$dataObject->getEntityId()}'; "
2222
. "Required param '{$requiredParam}'; Optional param '{$optionalParam}'\n";
23-
return true;
23+
file_put_contents($dataObject->getOutputPath(), $output);
24+
25+
return null;
2426
}
2527
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
<?php
2+
/**
3+
* Copyright © Magento, Inc. All rights reserved.
4+
* See COPYING.txt for license details.
5+
*/
6+
namespace Magento\TestModuleMysqlMq\Model;
7+
8+
/**
9+
* Test message processor is used by \Magento\MysqlMq\Model\PublisherConsumerTest
10+
*/
11+
class Processor
12+
{
13+
/**
14+
* @param \Magento\TestModuleMysqlMq\Model\DataObject $message
15+
*/
16+
public function processMessage($message)
17+
{
18+
file_put_contents(
19+
$message->getOutputPath(),
20+
"Processed {$message->getEntityId()}" . PHP_EOL,
21+
FILE_APPEND
22+
);
23+
}
24+
25+
/**
26+
* @param \Magento\TestModuleMysqlMq\Model\DataObject $message
27+
*/
28+
public function processObjectCreated($message)
29+
{
30+
file_put_contents(
31+
$message->getOutputPath(),
32+
"Processed object created {$message->getEntityId()}" . PHP_EOL,
33+
FILE_APPEND
34+
);
35+
}
36+
37+
/**
38+
* @param \Magento\TestModuleMysqlMq\Model\DataObject $message
39+
*/
40+
public function processCustomObjectCreated($message)
41+
{
42+
file_put_contents(
43+
$message->getOutputPath(),
44+
"Processed custom object created {$message->getEntityId()}" . PHP_EOL,
45+
FILE_APPEND
46+
);
47+
}
48+
49+
/**
50+
* @param \Magento\TestModuleMysqlMq\Model\DataObject $message
51+
*/
52+
public function processObjectUpdated($message)
53+
{
54+
file_put_contents(
55+
$message->getOutputPath(),
56+
"Processed object updated {$message->getEntityId()}" . PHP_EOL,
57+
FILE_APPEND
58+
);
59+
}
60+
61+
/**
62+
* @param \Magento\TestModuleMysqlMq\Model\DataObject $message
63+
*/
64+
public function processMessageWithException($message)
65+
{
66+
file_put_contents($message->getOutputPath(), "Exception processing {$message->getEntityId()}");
67+
throw new \LogicException(
68+
"Exception during message processing happened. Entity: {{$message->getEntityId()}}"
69+
);
70+
}
71+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<?xml version="1.0"?>
2+
<!--
3+
/**
4+
* Copyright © Magento, Inc. All rights reserved.
5+
* See COPYING.txt for license details.
6+
*/
7+
-->
8+
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
9+
<topic name="demo.exception" request="Magento\TestModuleMysqlMq\Model\DataObject"/>
10+
<topic name="test.schema.defined.by.method" schema="Magento\TestModuleMysqlMq\Model\DataObjectRepository::delayedOperation" is_synchronous="false"/>
11+
<topic name="demo.object.created" request="Magento\TestModuleMysqlMq\Model\DataObject"/>
12+
<topic name="demo.object.updated" request="Magento\TestModuleMysqlMq\Model\DataObject"/>
13+
<topic name="demo.object.custom.created" request="Magento\TestModuleMysqlMq\Model\DataObject"/>
14+
</config>
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<?xml version="1.0"?>
2+
<!--
3+
/**
4+
* Copyright © Magento, Inc. All rights reserved.
5+
* See COPYING.txt for license details.
6+
*/
7+
-->
8+
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Module/etc/module.xsd">
9+
<module name="Magento_TestModuleMysqlMq" active="true">
10+
</module>
11+
</config>
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
/**
4+
* Copyright © Magento, Inc. All rights reserved.
5+
* See COPYING.txt for license details.
6+
*/
7+
-->
8+
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/queue.xsd">
9+
<broker topic="demo.exception" type="db" exchange="magento">
10+
<queue consumer="demoConsumerWithException" name="queue-exception" handler="Magento\TestModuleMysqlMq\Model\Processor::processMessageWithException"/>
11+
</broker>
12+
<broker topic="test.schema.defined.by.method" type="db" exchange="magento">
13+
<queue consumer="delayedOperationConsumer" name="demo-queue-6" handler="Magento\TestModuleMysqlMq\Model\DataObjectRepository::delayedOperation"/>
14+
</broker>
15+
<broker topic="demo.object.created" type="db" exchange="magento">
16+
<queue consumer="demoConsumerQueueOne" name="queue-created" handler="\Magento\TestModuleMysqlMq\Model\Processor::processObjectCreated"/>
17+
</broker>
18+
<broker topic="demo.object.updated" exchange="magento" type="db">
19+
<queue consumer="demoConsumerQueueTwo" name="queue-updated" handler="\Magento\TestModuleMysqlMq\Model\Processor::processObjectUpdated"/>
20+
</broker>
21+
<broker topic="demo.object.custom.created" exchange="magento" type="db">
22+
<queue consumer="demoConsumerQueueThree" name="queue-custom-created" handler="\Magento\TestModuleMysqlMq\Model\Processor::processCustomObjectCreated"/>
23+
</broker>
24+
</config>
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<?xml version="1.0"?>
2+
<!--
3+
/**
4+
* Copyright © Magento, Inc. All rights reserved.
5+
* See COPYING.txt for license details.
6+
*/
7+
-->
8+
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
9+
<consumer name="demoConsumerQueueOne" queue="queue-created" connection="db" handler="Magento\TestModuleMysqlMq\Model\Processor::processObjectCreated"/>
10+
<consumer name="demoConsumerQueueTwo" queue="queue-updated" connection="db" handler="Magento\TestModuleMysqlMq\Model\Processor::processObjectUpdated"/>
11+
<consumer name="demoConsumerQueueThree" queue="queue-custom-created" connection="db" handler="Magento\TestModuleMysqlMq\Model\Processor::processCustomObjectCreated"/>
12+
<consumer name="demoConsumerWithException" queue="queue-exception" connection="db" handler="Magento\TestModuleMysqlMq\Model\Processor::processMessageWithException"/>
13+
<consumer name="delayedOperationConsumer" queue="demo-queue-6" connection="db" handler="Magento\TestModuleMysqlMq\Model\DataObjectRepository::delayedOperation"/>
14+
</config>
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?xml version="1.0"?>
2+
<!--
3+
/**
4+
* Copyright © Magento, Inc. All rights reserved.
5+
* See COPYING.txt for license details.
6+
*/
7+
-->
8+
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
9+
<publisher topic="demo.exception">
10+
<connection name="db" exchange="magento"/>
11+
</publisher>
12+
<publisher topic="test.schema.defined.by.method">
13+
<connection name="db" exchange="magento"/>
14+
</publisher>
15+
<publisher topic="demo.object.created">
16+
<connection name="db" exchange="magento"/>
17+
</publisher>
18+
<publisher topic="demo.object.updated">
19+
<connection name="db" exchange="magento"/>
20+
</publisher>
21+
<publisher topic="demo.object.custom.created">
22+
<connection name="db" exchange="magento"/>
23+
</publisher>
24+
</config>
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<?xml version="1.0"?>
2+
<!--
3+
/**
4+
* Copyright © Magento, Inc. All rights reserved.
5+
* See COPYING.txt for license details.
6+
*/
7+
-->
8+
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
9+
10+
<exchange name="magento" type="topic" connection="db">
11+
<binding id="demo.exception.consumer" topic="demo.exception" destination="queue-exception" destinationType="queue"/>
12+
<binding id="test.schema.defined.by.method" topic="test.schema.defined.by.method" destination="demo-queue-6" destinationType="queue"/>
13+
<binding id="demo.object.created" topic="demo.object.created" destination="queue-created" destinationType="queue"/>
14+
<binding id="demo.object.updated" topic="demo.object.updated" destination="queue-updated" destinationType="queue"/>
15+
<binding id="demo.object.all" topic="demo.object.*" destination="queue-custom-created" destinationType="queue"/>
16+
</exchange>
17+
</config>
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?php
2+
/**
3+
* Copyright © Magento, Inc. All rights reserved.
4+
* See COPYING.txt for license details.
5+
*/
6+
7+
use Magento\Framework\Component\ComponentRegistrar;
8+
9+
$registrar = new ComponentRegistrar();
10+
if ($registrar->getPath(ComponentRegistrar::MODULE, 'Magento_TestModuleMysqlMq') === null) {
11+
ComponentRegistrar::register(ComponentRegistrar::MODULE, 'Magento_TestModuleMysqlMq', __DIR__);
12+
}

dev/tests/integration/testsuite/Magento/Framework/MessageQueue/_files/communication.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@
77
-->
88
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
99
<topic name="topic.broker.test" request="string" response="string">
10-
<handler name="topicBrokerHandler" type="Magento\MysqlMq\Model\Processor" method="processMessage"/>
10+
<handler name="topicBrokerHandler" type="Magento\TestModuleMysqlMq\Model\Processor" method="processMessage"/>
1111
</topic>
1212
</config>

dev/tests/integration/testsuite/Magento/Framework/MessageQueue/_files/valid_expected_queue.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
"name" => "publisher5.topic",
4141
"schema" => [
4242
"schema_type" => "object",
43-
"schema_value" => '\\' . \Magento\MysqlMq\Model\DataObject::class
43+
"schema_value" => '\\' . \Magento\TestModuleMysqlMq\Model\DataObject::class
4444
],
4545
"response_schema" => [
4646
"schema_type" => "object",
@@ -58,7 +58,7 @@
5858
"handlers" => [
5959
"topic.broker.test" => [
6060
"0" => [
61-
"type" => \Magento\MysqlMq\Model\Processor::class,
61+
"type" => \Magento\TestModuleMysqlMq\Model\Processor::class,
6262
"method" => "processMessage"
6363
]
6464
]

dev/tests/integration/testsuite/Magento/Framework/MessageQueue/_files/valid_queue_input.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@
2323
"name" => "publisher5.topic",
2424
"schema" => [
2525
"schema_type" => "object",
26-
"schema_value" => "Magento\\MysqlMq\\Model\\DataObject"
26+
"schema_value" => \Magento\TestModuleMysqlMq\Model\DataObject::class
2727
],
2828
"response_schema" => [
2929
"schema_type" => "object",
30-
"schema_value" => "Magento\\Customer\\Api\\Data\\CustomerInterface"
30+
"schema_value" => \Magento\Customer\Api\Data\CustomerInterface::class
3131
],
3232
"publisher" => "test-publisher-5"
3333
]

dev/tests/integration/testsuite/Magento/MysqlMq/Model/Processor.php

Lines changed: 0 additions & 28 deletions
This file was deleted.

0 commit comments

Comments
 (0)