Skip to content

Commit cc69df2

Browse files
authored
Merge pull request #720 from cshum/master
[sqs] Dead Letter Queue Adoption
2 parents 892f579 + 128d926 commit cc69df2

File tree

3 files changed

+27
-23
lines changed

3 files changed

+27
-23
lines changed

pkg/sqs/SqsClient.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ public function receiveMessage(array $args): Result
4343
return $this->callApi('receiveMessage', $args);
4444
}
4545

46+
public function changeMessageVisibility(array $args): Result
47+
{
48+
return $this->callApi('changeMessageVisibility', $args);
49+
}
50+
4651
public function purgeQueue(array $args): Result
4752
{
4853
return $this->callApi('purgeQueue', $args);

pkg/sqs/SqsConsumer.php

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -133,14 +133,19 @@ public function reject(Message $message, bool $requeue = false): void
133133
{
134134
InvalidMessageException::assertMessageInstanceOf($message, SqsMessage::class);
135135

136-
$this->context->getSqsClient()->deleteMessage([
137-
'@region' => $this->queue->getRegion(),
138-
'QueueUrl' => $this->context->getQueueUrl($this->queue),
139-
'ReceiptHandle' => $message->getReceiptHandle(),
140-
]);
141-
142136
if ($requeue) {
143-
$this->context->createProducer()->send($this->queue, $message);
137+
$this->context->getSqsClient()->changeMessageVisibility([
138+
'@region' => $this->queue->getRegion(),
139+
'QueueUrl' => $this->context->getQueueUrl($this->queue),
140+
'ReceiptHandle' => $message->getReceiptHandle(),
141+
'VisibilityTimeout' => 0,
142+
]);
143+
} else {
144+
$this->context->getSqsClient()->deleteMessage([
145+
'@region' => $this->queue->getRegion(),
146+
'QueueUrl' => $this->context->getQueueUrl($this->queue),
147+
'ReceiptHandle' => $message->getReceiptHandle(),
148+
]);
144149
}
145150
}
146151

pkg/sqs/Tests/SqsConsumerTest.php

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -203,26 +203,15 @@ public function testShouldRejectMessageAndRequeue()
203203
$client = $this->createSqsClientMock();
204204
$client
205205
->expects($this->once())
206-
->method('deleteMessage')
206+
->method('changeMessageVisibility')
207207
->with($this->identicalTo([
208-
'@region' => null,
208+
'@region' => 'theRegion',
209209
'QueueUrl' => 'theQueueUrl',
210210
'ReceiptHandle' => 'theReceipt',
211+
'VisibilityTimeout' => 0,
211212
]))
212213
;
213214

214-
$message = new SqsMessage();
215-
$message->setReceiptHandle('theReceipt');
216-
217-
$destination = new SqsDestination('queue');
218-
219-
$producer = $this->createProducerMock();
220-
$producer
221-
->expects($this->once())
222-
->method('send')
223-
->with($this->identicalTo($destination), $this->identicalTo($message))
224-
;
225-
226215
$context = $this->createContextMock();
227216
$context
228217
->expects($this->once())
@@ -235,11 +224,16 @@ public function testShouldRejectMessageAndRequeue()
235224
->willReturn('theQueueUrl')
236225
;
237226
$context
238-
->expects($this->once())
227+
->expects($this->never())
239228
->method('createProducer')
240-
->willReturn($producer)
241229
;
242230

231+
$message = new SqsMessage();
232+
$message->setReceiptHandle('theReceipt');
233+
234+
$destination = new SqsDestination('queue');
235+
$destination->setRegion('theRegion');
236+
243237
$consumer = new SqsConsumer($context, $destination);
244238
$consumer->reject($message, true);
245239
}

0 commit comments

Comments
 (0)