Skip to content

[consumption] Rework QueueConsumer extension points. #554

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Oct 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions docs/bundle/config_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ enqueue:
redelivered_delay_time: 0
consumption:

# the time in milliseconds queue consumer waits if no message received
idle_timeout: 0

# the time in milliseconds queue consumer waits for a message (100 ms by default)
receive_timeout: 100
job: false
Expand Down
14 changes: 4 additions & 10 deletions docs/bundle/consumption_extension.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,14 @@ Let's first create an extension itself:
// src/AppBundle/Enqueue;
namespace AppBundle\Enqueue;

use Enqueue\Consumption\ExtensionInterface;
use Enqueue\Consumption\EmptyExtensionTrait;
use Enqueue\Consumption\Context;
use Enqueue\Consumption\PostMessageReceivedExtensionInterface;
use Enqueue\Consumption\Context\PostMessageReceived;

class CountProcessedMessagesExtension implements ExtensionInterface
class CountProcessedMessagesExtension implements PostMessageReceivedExtensionInterface
{
use EmptyExtensionTrait;

private $processedMessages = 0;

/**
* {@inheritdoc}
*/
public function onPostReceived(Context $context)
public function onPostMessageReceived(PostMessageReceived $context): void
{
$this->processedMessages += 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

namespace Enqueue\Bundle\Consumption\Extension;

use Enqueue\Consumption\Context;
use Enqueue\Consumption\EmptyExtensionTrait;
use Enqueue\Consumption\ExtensionInterface;
use Enqueue\Consumption\Context\MessageReceived;
use Enqueue\Consumption\MessageReceivedExtensionInterface;
use Symfony\Bridge\Doctrine\RegistryInterface;

class DoctrineClearIdentityMapExtension implements ExtensionInterface
class DoctrineClearIdentityMapExtension implements MessageReceivedExtensionInterface
{
use EmptyExtensionTrait;

/**
* @var RegistryInterface
*/
Expand All @@ -24,10 +21,7 @@ public function __construct(RegistryInterface $registry)
$this->registry = $registry;
}

/**
* {@inheritdoc}
*/
public function onPreReceived(Context $context)
public function onMessageReceived(MessageReceived $context): void
{
foreach ($this->registry->getManagers() as $name => $manager) {
$context->getLogger()->debug(sprintf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@
namespace Enqueue\Bundle\Consumption\Extension;

use Doctrine\DBAL\Connection;
use Enqueue\Consumption\Context;
use Enqueue\Consumption\EmptyExtensionTrait;
use Enqueue\Consumption\ExtensionInterface;
use Enqueue\Consumption\Context\MessageReceived;
use Enqueue\Consumption\MessageReceivedExtensionInterface;
use Symfony\Bridge\Doctrine\RegistryInterface;

class DoctrinePingConnectionExtension implements ExtensionInterface
class DoctrinePingConnectionExtension implements MessageReceivedExtensionInterface
{
use EmptyExtensionTrait;

/**
* @var RegistryInterface
*/
Expand All @@ -25,10 +22,7 @@ public function __construct(RegistryInterface $registry)
$this->registry = $registry;
}

/**
* {@inheritdoc}
*/
public function onPreReceived(Context $context)
public function onMessageReceived(MessageReceived $context): void
{
/** @var Connection $connection */
foreach ($this->registry->getConnections() as $connection) {
Expand Down
3 changes: 1 addition & 2 deletions pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ public function load(array $configs, ContainerBuilder $container): void
}

$container->getDefinition('enqueue.client.default.queue_consumer')
->replaceArgument(2, $config['consumption']['idle_time'])
->replaceArgument(3, $config['consumption']['receive_timeout'])
->replaceArgument(4, $config['consumption']['receive_timeout'])
;
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/enqueue-bundle/Resources/config/client.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ services:
arguments:
- '@enqueue.client.default.context'
- '@enqueue.client.default.consumption_extensions'
- []
- null
- ~
- ~

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

use Doctrine\Common\Persistence\ObjectManager;
use Enqueue\Bundle\Consumption\Extension\DoctrineClearIdentityMapExtension;
use Enqueue\Consumption\Context;
use Enqueue\Consumption\Context\MessageReceived;
use Interop\Queue\Consumer;
use Interop\Queue\Context as InteropContext;
use Interop\Queue\Message;
use Interop\Queue\Processor;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
Expand Down Expand Up @@ -42,17 +43,19 @@ public function testShouldClearIdentityMap()
;

$extension = new DoctrineClearIdentityMapExtension($registry);
$extension->onPreReceived($context);
$extension->onMessageReceived($context);
}

protected function createContext(): Context
protected function createContext(): MessageReceived
{
$context = new Context($this->createMock(InteropContext::class));
$context->setLogger($this->createMock(LoggerInterface::class));
$context->setConsumer($this->createMock(Consumer::class));
$context->setProcessor($this->createMock(Processor::class));

return $context;
return new MessageReceived(
$this->createMock(InteropContext::class),
$this->createMock(Consumer::class),
$this->createMock(Message::class),
$this->createMock(Processor::class),
1,
$this->createMock(LoggerInterface::class)
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

use Doctrine\DBAL\Connection;
use Enqueue\Bundle\Consumption\Extension\DoctrinePingConnectionExtension;
use Enqueue\Consumption\Context;
use Enqueue\Consumption\Context\MessageReceived;
use Interop\Queue\Consumer;
use Interop\Queue\Context as InteropContext;
use Interop\Queue\Message;
use Interop\Queue\Processor;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
Expand Down Expand Up @@ -55,7 +56,7 @@ public function testShouldNotReconnectIfConnectionIsOK()
;

$extension = new DoctrinePingConnectionExtension($registry);
$extension->onPreReceived($context);
$extension->onMessageReceived($context);
}

public function testShouldDoesReconnectIfConnectionFailed()
Expand Down Expand Up @@ -100,7 +101,7 @@ public function testShouldDoesReconnectIfConnectionFailed()
;

$extension = new DoctrinePingConnectionExtension($registry);
$extension->onPreReceived($context);
$extension->onMessageReceived($context);
}

public function testShouldSkipIfConnectionWasNotOpened()
Expand Down Expand Up @@ -143,17 +144,19 @@ public function testShouldSkipIfConnectionWasNotOpened()
;

$extension = new DoctrinePingConnectionExtension($registry);
$extension->onPreReceived($context);
$extension->onMessageReceived($context);
}

protected function createContext(): Context
protected function createContext(): MessageReceived
{
$context = new Context($this->createMock(InteropContext::class));
$context->setLogger($this->createMock(LoggerInterface::class));
$context->setConsumer($this->createMock(Consumer::class));
$context->setProcessor($this->createMock(Processor::class));

return $context;
return new MessageReceived(
$this->createMock(InteropContext::class),
$this->createMock(Consumer::class),
$this->createMock(Message::class),
$this->createMock(Processor::class),
1,
$this->createMock(LoggerInterface::class)
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public function testShouldUseDefaultConfigurationIfNothingIsConfiguredAtAll()
$this->assertEquals([
'transport' => ['dsn' => 'null:'],
'consumption' => [
'idle_time' => 0,
'receive_timeout' => 10000,
],
'job' => false,
Expand All @@ -66,7 +65,6 @@ public function testShouldUseDefaultTransportIfIfTransportIsConfiguredAtAll()
$this->assertEquals([
'transport' => ['dsn' => 'null:'],
'consumption' => [
'idle_time' => 0,
'receive_timeout' => 10000,
],
'job' => false,
Expand Down Expand Up @@ -383,7 +381,6 @@ public function testShouldSetDefaultConfigurationForConsumption()

$this->assertArraySubset([
'consumption' => [
'idle_time' => 0,
'receive_timeout' => 10000,
],
], $config);
Expand All @@ -397,14 +394,12 @@ public function testShouldAllowConfigureConsumption()
$config = $processor->processConfiguration($configuration, [[
'transport' => [],
'consumption' => [
'idle_time' => 123,
'receive_timeout' => 456,
],
]]);

$this->assertArraySubset([
'consumption' => [
'idle_time' => 123,
'receive_timeout' => 456,
],
], $config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,21 +430,17 @@ public function testShouldConfigureQueueConsumer()
'transport' => [
],
'consumption' => [
'idle_time' => 123,
'receive_timeout' => 456,
],
]], $container);

$def = $container->getDefinition('enqueue.transport.default.queue_consumer');
$this->assertSame('%enqueue.transport.default.idle_time%', $def->getArgument(2));
$this->assertSame('%enqueue.transport.default.receive_timeout%', $def->getArgument(3));
$this->assertSame('%enqueue.transport.default.receive_timeout%', $def->getArgument(4));

$this->assertSame(123, $container->getParameter('enqueue.transport.default.idle_time'));
$this->assertSame(456, $container->getParameter('enqueue.transport.default.receive_timeout'));

$def = $container->getDefinition('enqueue.client.default.queue_consumer');
$this->assertSame(123, $def->getArgument(2));
$this->assertSame(456, $def->getArgument(3));
$this->assertSame(456, $def->getArgument(4));
}

public function testShouldLoadProcessAutoconfigureChildDefinition()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@
namespace Enqueue\Client\ConsumptionExtension;

use Enqueue\Client\DriverInterface;
use Enqueue\Consumption\Context;
use Enqueue\Consumption\EmptyExtensionTrait;
use Enqueue\Consumption\ExtensionInterface;
use Enqueue\Consumption\Context\MessageReceived;
use Enqueue\Consumption\MessageReceivedExtensionInterface;
use Enqueue\Consumption\Result;

class DelayRedeliveredMessageExtension implements ExtensionInterface
class DelayRedeliveredMessageExtension implements MessageReceivedExtensionInterface
{
use EmptyExtensionTrait;

const PROPERTY_REDELIVER_COUNT = 'enqueue.redelivery_count';

/**
Expand All @@ -36,12 +33,9 @@ public function __construct(DriverInterface $driver, $delay)
$this->delay = $delay;
}

/**
* {@inheritdoc}
*/
public function onPreReceived(Context $context)
public function onMessageReceived(MessageReceived $context): void
{
$message = $context->getInteropMessage();
$message = $context->getMessage();
if (false == $message->isRedelivered()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@
use Enqueue\Client\Config;
use Enqueue\Client\DriverInterface;
use Enqueue\Client\Route;
use Enqueue\Consumption\Context;
use Enqueue\Consumption\EmptyExtensionTrait as ConsumptionEmptyExtensionTrait;
use Enqueue\Consumption\ExtensionInterface as ConsumptionExtensionInterface;
use Enqueue\Consumption\Context\MessageReceived;
use Enqueue\Consumption\MessageReceivedExtensionInterface;

final class ExclusiveCommandExtension implements ConsumptionExtensionInterface
final class ExclusiveCommandExtension implements MessageReceivedExtensionInterface
{
use ConsumptionEmptyExtensionTrait;

/**
* @var DriverInterface
*/
Expand All @@ -28,11 +25,9 @@ public function __construct(DriverInterface $driver)
$this->driver = $driver;
}

public function onPreReceived(Context $context)
public function onMessageReceived(MessageReceived $context): void
{
$message = $context->getInteropMessage();
$queue = $context->getInteropQueue();

$message = $context->getMessage();
if ($message->getProperty(Config::TOPIC)) {
return;
}
Expand All @@ -47,6 +42,7 @@ public function onPreReceived(Context $context)
$this->queueToRouteMap = $this->buildMap();
}

$queue = $context->getConsumer()->getQueue();
if (array_key_exists($queue->getQueueName(), $this->queueToRouteMap)) {
$context->getLogger()->debug('[ExclusiveCommandExtension] This is a exclusive command queue and client\'s properties are not set. Setting them');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
namespace Enqueue\Client\ConsumptionExtension;

use Enqueue\Client\SpoolProducer;
use Enqueue\Consumption\Context;
use Enqueue\Consumption\EmptyExtensionTrait;
use Enqueue\Consumption\ExtensionInterface;
use Enqueue\Consumption\Context\End;
use Enqueue\Consumption\Context\PostMessageReceived;
use Enqueue\Consumption\EndExtensionInterface;
use Enqueue\Consumption\PostMessageReceivedExtensionInterface;

class FlushSpoolProducerExtension implements ExtensionInterface
class FlushSpoolProducerExtension implements PostMessageReceivedExtensionInterface, EndExtensionInterface
{
use EmptyExtensionTrait;

/**
* @var SpoolProducer
*/
Expand All @@ -24,15 +23,12 @@ public function __construct(SpoolProducer $producer)
$this->producer = $producer;
}

/**
* {@inheritdoc}
*/
public function onPostReceived(Context $context)
public function onPostMessageReceived(PostMessageReceived $context): void
{
$this->producer->flush();
}

public function onInterrupted(Context $context)
public function onEnd(End $context): void
{
$this->producer->flush();
}
Expand Down
Loading