Skip to content

Commit 83bd77d

Browse files
committed
[consumption] onPreReceived -> onMessageReceived
1 parent dd32103 commit 83bd77d

21 files changed

+432
-382
lines changed

pkg/enqueue-bundle/Consumption/Extension/DoctrineClearIdentityMapExtension.php

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace Enqueue\Bundle\Consumption\Extension;
44

5-
use Enqueue\Consumption\Context;
5+
use Enqueue\Consumption\Context\MessageReceived;
66
use Enqueue\Consumption\EmptyExtensionTrait;
77
use Enqueue\Consumption\ExtensionInterface;
88
use Symfony\Bridge\Doctrine\RegistryInterface;
@@ -24,10 +24,7 @@ public function __construct(RegistryInterface $registry)
2424
$this->registry = $registry;
2525
}
2626

27-
/**
28-
* {@inheritdoc}
29-
*/
30-
public function onPreReceived(Context $context)
27+
public function onMessageReceived(MessageReceived $context): void
3128
{
3229
foreach ($this->registry->getManagers() as $name => $manager) {
3330
$context->getLogger()->debug(sprintf(

pkg/enqueue-bundle/Consumption/Extension/DoctrinePingConnectionExtension.php

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
namespace Enqueue\Bundle\Consumption\Extension;
44

55
use Doctrine\DBAL\Connection;
6-
use Enqueue\Consumption\Context;
6+
use Enqueue\Consumption\Context\MessageReceived;
77
use Enqueue\Consumption\EmptyExtensionTrait;
88
use Enqueue\Consumption\ExtensionInterface;
99
use Symfony\Bridge\Doctrine\RegistryInterface;
@@ -25,10 +25,7 @@ public function __construct(RegistryInterface $registry)
2525
$this->registry = $registry;
2626
}
2727

28-
/**
29-
* {@inheritdoc}
30-
*/
31-
public function onPreReceived(Context $context)
28+
public function onMessageReceived(MessageReceived $context): void
3229
{
3330
/** @var Connection $connection */
3431
foreach ($this->registry->getConnections() as $connection) {

pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrineClearIdentityMapExtensionTest.php

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44

55
use Doctrine\Common\Persistence\ObjectManager;
66
use Enqueue\Bundle\Consumption\Extension\DoctrineClearIdentityMapExtension;
7-
use Enqueue\Consumption\Context;
7+
use Enqueue\Consumption\Context\MessageReceived;
88
use Interop\Queue\Consumer;
99
use Interop\Queue\Context as InteropContext;
10+
use Interop\Queue\Message;
1011
use Interop\Queue\Processor;
1112
use PHPUnit\Framework\TestCase;
1213
use Psr\Log\LoggerInterface;
@@ -42,17 +43,18 @@ public function testShouldClearIdentityMap()
4243
;
4344

4445
$extension = new DoctrineClearIdentityMapExtension($registry);
45-
$extension->onPreReceived($context);
46+
$extension->onMessageReceived($context);
4647
}
4748

48-
protected function createContext(): Context
49+
protected function createContext(): MessageReceived
4950
{
50-
$context = new Context($this->createMock(InteropContext::class));
51-
$context->setLogger($this->createMock(LoggerInterface::class));
52-
$context->setConsumer($this->createMock(Consumer::class));
53-
$context->setProcessor($this->createMock(Processor::class));
54-
55-
return $context;
51+
return new MessageReceived(
52+
$this->createMock(InteropContext::class),
53+
$this->createMock(Consumer::class),
54+
$this->createMock(Message::class),
55+
$this->createMock(Processor::class),
56+
$this->createMock(LoggerInterface::class)
57+
);
5658
}
5759

5860
/**

pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrinePingConnectionExtensionTest.php

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44

55
use Doctrine\DBAL\Connection;
66
use Enqueue\Bundle\Consumption\Extension\DoctrinePingConnectionExtension;
7-
use Enqueue\Consumption\Context;
7+
use Enqueue\Consumption\Context\MessageReceived;
88
use Interop\Queue\Consumer;
99
use Interop\Queue\Context as InteropContext;
10+
use Interop\Queue\Message;
1011
use Interop\Queue\Processor;
1112
use PHPUnit\Framework\TestCase;
1213
use Psr\Log\LoggerInterface;
@@ -55,7 +56,7 @@ public function testShouldNotReconnectIfConnectionIsOK()
5556
;
5657

5758
$extension = new DoctrinePingConnectionExtension($registry);
58-
$extension->onPreReceived($context);
59+
$extension->onMessageReceived($context);
5960
}
6061

6162
public function testShouldDoesReconnectIfConnectionFailed()
@@ -100,7 +101,7 @@ public function testShouldDoesReconnectIfConnectionFailed()
100101
;
101102

102103
$extension = new DoctrinePingConnectionExtension($registry);
103-
$extension->onPreReceived($context);
104+
$extension->onMessageReceived($context);
104105
}
105106

106107
public function testShouldSkipIfConnectionWasNotOpened()
@@ -143,17 +144,18 @@ public function testShouldSkipIfConnectionWasNotOpened()
143144
;
144145

145146
$extension = new DoctrinePingConnectionExtension($registry);
146-
$extension->onPreReceived($context);
147+
$extension->onMessageReceived($context);
147148
}
148149

149-
protected function createContext(): Context
150+
protected function createContext(): MessageReceived
150151
{
151-
$context = new Context($this->createMock(InteropContext::class));
152-
$context->setLogger($this->createMock(LoggerInterface::class));
153-
$context->setConsumer($this->createMock(Consumer::class));
154-
$context->setProcessor($this->createMock(Processor::class));
155-
156-
return $context;
152+
return new MessageReceived(
153+
$this->createMock(InteropContext::class),
154+
$this->createMock(Consumer::class),
155+
$this->createMock(Message::class),
156+
$this->createMock(Processor::class),
157+
$this->createMock(LoggerInterface::class)
158+
);
157159
}
158160

159161
/**

pkg/enqueue/Client/ConsumptionExtension/DelayRedeliveredMessageExtension.php

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
namespace Enqueue\Client\ConsumptionExtension;
44

55
use Enqueue\Client\DriverInterface;
6-
use Enqueue\Consumption\Context;
6+
use Enqueue\Consumption\Context\MessageReceived;
77
use Enqueue\Consumption\EmptyExtensionTrait;
88
use Enqueue\Consumption\ExtensionInterface;
99
use Enqueue\Consumption\Result;
@@ -36,12 +36,9 @@ public function __construct(DriverInterface $driver, $delay)
3636
$this->delay = $delay;
3737
}
3838

39-
/**
40-
* {@inheritdoc}
41-
*/
42-
public function onPreReceived(Context $context)
39+
public function onMessageReceived(MessageReceived $context): void
4340
{
44-
$message = $context->getInteropMessage();
41+
$message = $context->getMessage();
4542
if (false == $message->isRedelivered()) {
4643
return;
4744
}

pkg/enqueue/Client/ConsumptionExtension/ExclusiveCommandExtension.php

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
use Enqueue\Client\Config;
66
use Enqueue\Client\DriverInterface;
77
use Enqueue\Client\Route;
8-
use Enqueue\Consumption\Context;
8+
use Enqueue\Consumption\Context\MessageReceived;
99
use Enqueue\Consumption\EmptyExtensionTrait as ConsumptionEmptyExtensionTrait;
1010
use Enqueue\Consumption\ExtensionInterface as ConsumptionExtensionInterface;
1111

@@ -28,11 +28,9 @@ public function __construct(DriverInterface $driver)
2828
$this->driver = $driver;
2929
}
3030

31-
public function onPreReceived(Context $context)
31+
public function onMessageReceived(MessageReceived $context): void
3232
{
33-
$message = $context->getInteropMessage();
34-
$queue = $context->getInteropQueue();
35-
33+
$message = $context->getMessage();
3634
if ($message->getProperty(Config::TOPIC)) {
3735
return;
3836
}
@@ -47,6 +45,7 @@ public function onPreReceived(Context $context)
4745
$this->queueToRouteMap = $this->buildMap();
4846
}
4947

48+
$queue = $context->getConsumer()->getQueue();
5049
if (array_key_exists($queue->getQueueName(), $this->queueToRouteMap)) {
5150
$context->getLogger()->debug('[ExclusiveCommandExtension] This is a exclusive command queue and client\'s properties are not set. Setting them');
5251

pkg/enqueue/Client/ConsumptionExtension/SetRouterPropertiesExtension.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
use Enqueue\Client\Config;
66
use Enqueue\Client\DriverInterface;
7-
use Enqueue\Consumption\Context;
7+
use Enqueue\Consumption\Context\MessageReceived;
88
use Enqueue\Consumption\EmptyExtensionTrait;
99
use Enqueue\Consumption\ExtensionInterface;
1010

@@ -25,16 +25,16 @@ public function __construct(DriverInterface $driver)
2525
$this->driver = $driver;
2626
}
2727

28-
public function onPreReceived(Context $context)
28+
public function onMessageReceived(MessageReceived $context): void
2929
{
30-
$message = $context->getInteropMessage();
30+
$message = $context->getMessage();
3131
if ($message->getProperty(Config::PROCESSOR)) {
3232
return;
3333
}
3434

3535
$config = $this->driver->getConfig();
3636
$queue = $this->driver->createQueue($config->getRouterQueue());
37-
if ($context->getInteropQueue()->getQueueName() != $queue->getQueueName()) {
37+
if ($context->getConsumer()->getQueue()->getQueueName() != $queue->getQueueName()) {
3838
return;
3939
}
4040

pkg/enqueue/Client/ExtensionInterface.php

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,4 @@ public function onPreSendCommand(PreSend $context): void;
1111
public function onDriverPreSend(DriverPreSend $context): void;
1212

1313
public function onPostSend(PostSend $context): void;
14-
15-
// /**
16-
// * @deprecated
17-
// */
18-
// public function onPreSend($topic, Message $message);
19-
//
20-
// /**
21-
// * @deprecated
22-
// */
23-
// public function onPostSend($topic, Message $message);
2414
}

pkg/enqueue/Consumption/ChainExtension.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Enqueue\Consumption;
44

5+
use Enqueue\Consumption\Context\MessageReceived;
56
use Enqueue\Consumption\Context\PreConsume;
67
use Enqueue\Consumption\Context\PreSubscribe;
78
use Enqueue\Consumption\Context\Start;
@@ -47,10 +48,10 @@ public function onPreConsume(PreConsume $context): void
4748
}
4849
}
4950

50-
public function onPreReceived(Context $context)
51+
public function onMessageReceived(MessageReceived $context): void
5152
{
5253
foreach ($this->extensions as $extension) {
53-
$extension->onPreReceived($context);
54+
$extension->onMessageReceived($context);
5455
}
5556
}
5657

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
<?php
2+
3+
namespace Enqueue\Consumption\Context;
4+
5+
use Enqueue\Consumption\Result;
6+
use Interop\Queue\Consumer;
7+
use Interop\Queue\Context;
8+
use Interop\Queue\Message;
9+
use Interop\Queue\Processor;
10+
use Psr\Log\LoggerInterface;
11+
12+
final class MessageReceived
13+
{
14+
/**
15+
* @var Context
16+
*/
17+
private $context;
18+
19+
/**
20+
* @var Consumer
21+
*/
22+
private $consumer;
23+
24+
/**
25+
* @var Message
26+
*/
27+
private $message;
28+
29+
/**
30+
* @var Processor
31+
*/
32+
private $processor;
33+
34+
/**
35+
* @var LoggerInterface
36+
*/
37+
private $logger;
38+
39+
/**
40+
* @var Result|null
41+
*/
42+
private $result;
43+
44+
public function __construct(Context $context, Consumer $consumer, Message $message, Processor $processor, LoggerInterface $logger)
45+
{
46+
$this->context = $context;
47+
$this->consumer = $consumer;
48+
$this->message = $message;
49+
$this->processor = $processor;
50+
$this->logger = $logger;
51+
}
52+
53+
public function getContext(): Context
54+
{
55+
return $this->context;
56+
}
57+
58+
public function getConsumer(): Consumer
59+
{
60+
return $this->consumer;
61+
}
62+
63+
public function getMessage(): Message
64+
{
65+
return $this->message;
66+
}
67+
68+
public function getProcessor(): Processor
69+
{
70+
return $this->processor;
71+
}
72+
73+
public function changeProcessor(Processor $processor): void
74+
{
75+
$this->processor = $processor;
76+
}
77+
78+
public function getLogger(): LoggerInterface
79+
{
80+
return $this->logger;
81+
}
82+
83+
public function getResult(): ?Result
84+
{
85+
return $this->result;
86+
}
87+
88+
public function setResult(Result $result): void
89+
{
90+
$this->result = $result;
91+
}
92+
}

pkg/enqueue/Consumption/EmptyExtensionTrait.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Enqueue\Consumption;
44

5+
use Enqueue\Consumption\Context\MessageReceived;
56
use Enqueue\Consumption\Context\PreConsume;
67
use Enqueue\Consumption\Context\PreSubscribe;
78
use Enqueue\Consumption\Context\Start;
@@ -20,7 +21,7 @@ public function onPreConsume(PreConsume $context): void
2021
{
2122
}
2223

23-
public function onPreReceived(Context $context)
24+
public function onMessageReceived(MessageReceived $context): void
2425
{
2526
}
2627

pkg/enqueue/Consumption/Extension/SignalExtension.php

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Enqueue\Consumption\Extension;
44

55
use Enqueue\Consumption\Context;
6+
use Enqueue\Consumption\Context\MessageReceived;
67
use Enqueue\Consumption\Context\PreConsume;
78
use Enqueue\Consumption\Context\Start;
89
use Enqueue\Consumption\EmptyExtensionTrait;
@@ -49,11 +50,8 @@ public function onPreConsume(PreConsume $context): void
4950
}
5051
}
5152

52-
public function onPreReceived(Context $context)
53+
public function onMessageReceived(MessageReceived $context): void
5354
{
54-
if ($this->shouldBeStopped($context->getLogger())) {
55-
$context->setExecutionInterrupted(true);
56-
}
5755
}
5856

5957
public function onPostReceived(Context $context)

0 commit comments

Comments
 (0)