Skip to content

Commit b0fcee8

Browse files
authored
Merge pull request #310 from php-enqueue/queue-consumer-does-not-exit-on-signal
[consumption] Fix signal handling when AMQP is used.
2 parents 3e5d338 + 2addddc commit b0fcee8

File tree

1 file changed

+36
-34
lines changed

1 file changed

+36
-34
lines changed

pkg/enqueue/Consumption/QueueConsumer.php

Lines changed: 36 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -168,39 +168,47 @@ public function consume(ExtensionInterface $runtimeExtension = null)
168168
$logger = $context->getLogger() ?: new NullLogger();
169169
$logger->info('Start consuming');
170170

171-
while (true) {
172-
try {
173-
if ($this->psrContext instanceof AmqpContext) {
174-
$callback = function (AmqpMessage $message, AmqpConsumer $consumer) use ($extension, $logger, &$context) {
175-
$currentProcessor = null;
171+
if ($this->psrContext instanceof AmqpContext) {
172+
$callback = function (AmqpMessage $message, AmqpConsumer $consumer) use ($extension, $logger, &$context) {
173+
$currentProcessor = null;
174+
175+
/** @var PsrQueue $queue */
176+
foreach ($this->boundProcessors as list($queue, $processor)) {
177+
if ($queue->getQueueName() === $consumer->getQueue()->getQueueName()) {
178+
$currentProcessor = $processor;
179+
}
180+
}
176181

177-
/** @var PsrQueue $queue */
178-
foreach ($this->boundProcessors as list($queue, $processor)) {
179-
if ($queue->getQueueName() === $consumer->getQueue()->getQueueName()) {
180-
$currentProcessor = $processor;
181-
}
182-
}
182+
if (false == $currentProcessor) {
183+
throw new \LogicException(sprintf('The processor for the queue "%s" could not be found.', $consumer->getQueue()->getQueueName()));
184+
}
183185

184-
if (false == $currentProcessor) {
185-
throw new \LogicException(sprintf('The processor for the queue "%s" could not be found.', $consumer->getQueue()->getQueueName()));
186-
}
186+
$context = new Context($this->psrContext);
187+
$context->setLogger($logger);
188+
$context->setPsrQueue($consumer->getQueue());
189+
$context->setPsrConsumer($consumer);
190+
$context->setPsrProcessor($currentProcessor);
191+
$context->setPsrMessage($message);
187192

188-
$context = new Context($this->psrContext);
189-
$context->setLogger($logger);
190-
$context->setPsrQueue($consumer->getQueue());
191-
$context->setPsrConsumer($consumer);
192-
$context->setPsrProcessor($currentProcessor);
193-
$context->setPsrMessage($message);
193+
$this->doConsume($extension, $context);
194194

195-
$this->doConsume($extension, $context);
195+
return true;
196+
};
196197

197-
return true;
198-
};
198+
foreach ($consumers as $consumer) {
199+
/* @var AmqpConsumer $consumer */
199200

200-
foreach ($consumers as $consumer) {
201-
/* @var AmqpConsumer $consumer */
201+
$this->psrContext->subscribe($consumer, $callback);
202+
}
203+
}
202204

203-
$this->psrContext->subscribe($consumer, $callback);
205+
while (true) {
206+
try {
207+
if ($this->psrContext instanceof AmqpContext) {
208+
$extension->onBeforeReceive($context);
209+
210+
if ($context->isExecutionInterrupted()) {
211+
throw new ConsumptionInterruptedException();
204212
}
205213

206214
$this->psrContext->consume($this->receiveTimeout);
@@ -266,16 +274,14 @@ protected function doConsume(ExtensionInterface $extension, Context $context)
266274
$consumer = $context->getPsrConsumer();
267275
$logger = $context->getLogger();
268276

269-
if (false == $context->getPsrMessage() instanceof AmqpContext) {
270-
$extension->onBeforeReceive($context);
271-
}
272-
273277
if ($context->isExecutionInterrupted()) {
274278
throw new ConsumptionInterruptedException();
275279
}
276280

277281
$message = $context->getPsrMessage();
278282
if (false == $message) {
283+
$extension->onBeforeReceive($context);
284+
279285
if ($message = $consumer->receive($this->receiveTimeout)) {
280286
$context->setPsrMessage($message);
281287
}
@@ -312,10 +318,6 @@ protected function doConsume(ExtensionInterface $extension, Context $context)
312318
$logger->info(sprintf('Message processed: %s', $context->getResult()));
313319

314320
$extension->onPostReceived($context);
315-
316-
if ($context->getPsrMessage() instanceof AmqpContext) {
317-
$extension->onBeforeReceive($context);
318-
}
319321
} else {
320322
usleep($this->idleTimeout * 1000);
321323
$extension->onIdle($context);

0 commit comments

Comments
 (0)