Skip to content

Commit 0b41756

Browse files
authored
Ensure failed messages go to DLQ (#31)
1 parent 2df0d50 commit 0b41756

File tree

5 files changed

+49
-15
lines changed

5 files changed

+49
-15
lines changed

lib/lambdakiq/job.rb

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,11 @@ def perform_error(e)
7272
@error = e
7373
else
7474
instrument :retry_stopped, error: e
75-
delete_message
75+
if should_redrive?
76+
@error = e
77+
else
78+
delete_message
79+
end
7680
end
7781
end
7882

@@ -97,10 +101,16 @@ def max_receive_count?
97101
executions > retry_limit
98102
end
99103

104+
def job_retry
105+
[active_job.lambdakiq_retry, Lambdakiq.config.max_retries, 12].compact.min
106+
end
107+
100108
def retry_limit
101-
config_retry = [Lambdakiq.config.max_retries, 12].min
102-
[ (active_job.lambdakiq_retry || config_retry),
103-
(queue.max_receive_count - 1) ].min
109+
[job_retry, (queue.max_receive_count - 1)].min
110+
end
111+
112+
def should_redrive?
113+
!queue.redrive_policy.nil? && job_retry >= queue.max_receive_count
104114
end
105115

106116
def fifo_delay?

lib/lambdakiq/queue.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ def attributes
2222
end
2323

2424
def redrive_policy
25-
@redrive_policy ||= JSON.parse(attributes['RedrivePolicy'])
25+
@redrive_policy ||= attributes['RedrivePolicy'] ? JSON.parse(attributes['RedrivePolicy']) : nil
2626
end
2727

2828
def max_receive_count
29-
redrive_policy['maxReceiveCount'].to_i
29+
redrive_policy&.dig('maxReceiveCount')&.to_i || 1
3030
end
3131

3232
def fifo?

test/cases/job_test.rb

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ class JobTest < LambdakiqSpec
5252
expect(perform_buffer_last_value).must_equal 'ErrorJob with: "test"'
5353
expect(logger).must_include 'Performing TestHelper::Jobs::ErrorJob'
5454
expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJob'
55-
# binding.pry ; return
5655
expect(logged_metric('retry_stopped.active_job')).must_be_nil
5756
enqueue_retry = logged_metric('enqueue_retry.active_job')
5857
expect(enqueue_retry).must_be :present?
@@ -80,9 +79,9 @@ class JobTest < LambdakiqSpec
8079
expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJob'
8180
end
8281

83-
it 'must delete message for failed jobs at the end of the queue/message max receive count' do
84-
# See ClientHelpers for setting queue to max receive count of 8.
85-
event = event_basic attributes: { ApproximateReceiveCount: '8' }, job_class: 'TestHelper::Jobs::ErrorJob'
82+
it 'must delete message for failed jobs after the first try when queues do not have a redrive policy' do
83+
client.stub_responses(:get_queue_attributes, { attributes: {} })
84+
event = event_basic job_class: 'TestHelper::Jobs::ErrorJob'
8685
response = Lambdakiq::Job.handler(event)
8786
assert_response response, failures: false
8887
expect(delete_message).must_be :present?
@@ -92,6 +91,24 @@ class JobTest < LambdakiqSpec
9291
expect(logged_metric('enqueue_retry.active_job')).must_be_nil
9392
retry_stopped = logged_metric('retry_stopped.active_job')
9493
expect(retry_stopped).must_be :present?
94+
expect(retry_stopped['Executions']).must_equal 1
95+
expect(retry_stopped['ExceptionName']).must_equal 'RuntimeError'
96+
end
97+
98+
it 'must not delete message for failed jobs and instead return a failure at the end of the queue/message max receive count' do
99+
# See ClientHelpers for setting queue to max receive count of 8.
100+
event = event_basic attributes: { ApproximateReceiveCount: '8' }, job_class: 'TestHelper::Jobs::ErrorJob', messageId: message_id
101+
response = Lambdakiq::Job.handler(event)
102+
103+
assert_response response, failures: true, identifiers: [message_id]
104+
expect(change_message_visibility).must_be_nil
105+
expect(delete_message).must_be_nil
106+
expect(perform_buffer_last_value).must_equal 'ErrorJob with: "test"'
107+
expect(logger).must_include 'Performing TestHelper::Jobs::ErrorJob'
108+
expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJob'
109+
expect(logged_metric('enqueue_retry.active_job')).must_be_nil
110+
retry_stopped = logged_metric('retry_stopped.active_job')
111+
expect(retry_stopped).must_be :present?
95112
expect(retry_stopped['Executions']).must_equal 8
96113
expect(retry_stopped['ExceptionName']).must_equal 'RuntimeError'
97114
end

test/cases/jobs/basic_job_nofifo_job_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ class BasicJobNofifoTest < LambdakiqSpec
66
expect(sent_message).must_be :present?
77
end
88

9-
it 'message body has no fifo queue nave vs fifo super class ' do
9+
it 'message body has no fifo queue name vs fifo super class ' do
1010
expect(sent_message_body['queue_name']).must_equal 'lambdakiq-JobsQueue-TESTING123'
1111
end
1212

test/cases/queue_test.rb

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
11
require 'test_helper'
22

33
class QueueTest < LambdakiqSpec
4-
let(:queue) { Lambdakiq.client.queues[queue_name] }
4+
let(:fifo_queue) { Lambdakiq.client.queues[queue_name] }
5+
let(:non_fifo_queue) { Lambdakiq.client.queues['non-fifo-queue'] }
56

67
it '#fifo?' do
7-
expect(queue.fifo?).must_equal true
8+
expect(fifo_queue.fifo?).must_equal true
9+
expect(non_fifo_queue.fifo?).must_equal false
810
end
911

10-
it '#max_receive_count' do
11-
expect(queue.max_receive_count).must_equal 8
12+
it '#max_receive_count returns the queue redrive policy maxReceiveCount' do
13+
expect(fifo_queue.max_receive_count).must_equal 8
14+
end
15+
16+
it '#max_receive_count returns 1 when the queue does not have a redrive policy' do
17+
client.stub_responses(:get_queue_attributes, { attributes: {} })
18+
expect(fifo_queue.max_receive_count).must_equal 1
1219
end
1320
end

0 commit comments

Comments
 (0)