Skip to content

Commit c2e5d02

Browse files
committed
[consumption] add already_ack result, queue consumer does nothing on it.
1 parent 5faac4f commit c2e5d02

File tree

3 files changed

+38
-0
lines changed

3 files changed

+38
-0
lines changed

pkg/enqueue/Consumption/QueueConsumer.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,8 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
274274
case Result::REQUEUE:
275275
$consumer->reject($message, true);
276276
break;
277+
case Result::ALREADY_ACKNOWLEDGED:
278+
break;
277279
default:
278280
throw new \LogicException(sprintf('Status is not supported: %s', $result));
279281
}

pkg/enqueue/Consumption/Result.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ class Result
2222
*/
2323
const REQUEUE = Processor::REQUEUE;
2424

25+
const ALREADY_ACKNOWLEDGED = 'enqueue.already_acknowledged';
26+
2527
/**
2628
* @var string
2729
*/

pkg/enqueue/Tests/Consumption/QueueConsumerTest.php

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,40 @@ public function testShouldRejectMessageIfProcessorReturnSuchStatus()
470470
$queueConsumer->consume();
471471
}
472472

473+
public function testShouldDoNothingIfProcessorReturnsAlreadyAcknowledged()
474+
{
475+
$messageMock = $this->createMessageMock();
476+
477+
$subscriptionConsumerMock = new DummySubscriptionConsumer();
478+
$subscriptionConsumerMock->addMessage($messageMock, 'foo_queue');
479+
480+
$consumerStub = $this->createConsumerStub('foo_queue');
481+
$consumerStub
482+
->expects($this->never())
483+
->method('reject')
484+
;
485+
$consumerStub
486+
->expects($this->never())
487+
->method('acknowledge')
488+
;
489+
490+
$contextStub = $this->createContextStub($consumerStub);
491+
492+
$processorMock = $this->createProcessorMock();
493+
$processorMock
494+
->expects($this->once())
495+
->method('process')
496+
->with($this->identicalTo($messageMock))
497+
->willReturn(Result::ALREADY_ACKNOWLEDGED)
498+
;
499+
500+
$queueConsumer = new QueueConsumer($contextStub, new BreakCycleExtension(1));
501+
$queueConsumer->setFallbackSubscriptionConsumer($subscriptionConsumerMock);
502+
$queueConsumer->bind(new NullQueue('foo_queue'), $processorMock);
503+
504+
$queueConsumer->consume();
505+
}
506+
473507
public function testShouldRequeueMessageIfProcessorReturnSuchStatus()
474508
{
475509
$messageMock = $this->createMessageMock();

0 commit comments

Comments
 (0)