Skip to content

Commit 760be27

Browse files
committed
Implement Worker mixin with lambdakiq_options.
1 parent e70546b commit 760be27

File tree

10 files changed

+113
-12
lines changed

10 files changed

+113
-12
lines changed

TODO.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ Q: How do I handle job priorities?
4848
A: Use different queues.
4949

5050
* How we allow FIFO queues to work with delay using message visibility.
51+
* Your SQS queue must have a `RedrivePolicy` policy!
52+
https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-sqs-queues.html#aws-sqs-queue-redrive
5153

5254
## Our Siqekiq Interfaces
5355

lib/lambdakiq/job.rb

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,20 @@ def queue
3636
Lambdakiq.client.queues[active_job.queue_name]
3737
end
3838

39+
def executions
40+
active_job.executions
41+
end
42+
3943
def perform
4044
fifo_delay? ? fifo_delay : execute
41-
delete_message
42-
rescue Exception => e
43-
perform_error(e)
4445
end
4546

4647
def execute
4748
ActiveJob::Base.execute(job_data)
49+
delete_message
50+
rescue Exception => e
51+
increment_executions
52+
perform_error(e)
4853
end
4954

5055
private
@@ -79,7 +84,13 @@ def client
7984
end
8085

8186
def max_receive_count?
82-
record.max_receive_count? || record.receive_count >= queue.max_receive_count
87+
executions > retry_limit
88+
end
89+
90+
def retry_limit
91+
config_retry = [Lambdakiq.config.max_retries, 12].min
92+
[ (active_job.lambdakiq_retry || config_retry),
93+
(queue.max_receive_count - 1) ].min
8394
end
8495

8596
def fifo_delay?
@@ -91,5 +102,9 @@ def fifo_delay
91102
client.change_message_visibility(params)
92103
end
93104

105+
def increment_executions
106+
active_job.executions = active_job.executions + 1
107+
end
108+
94109
end
95110
end

lib/lambdakiq/record.rb

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,6 @@ def receive_count
5050
@receive_count ||= attributes['ApproximateReceiveCount'].to_i
5151
end
5252

53-
def max_receive_count?
54-
receive_count >= Lambdakiq.config.max_retries
55-
end
56-
5753
def next_visibility_timeout
5854
@next_visibility_timeout ||= Backoff.backoff(receive_count)
5955
end

lib/lambdakiq/worker.rb

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,24 @@
11
module Lambdakiq
22
module Worker
3+
extend ActiveSupport::Concern
4+
5+
included do
6+
class_attribute :lambdakiq_options_hash,
7+
instance_predicate: false,
8+
default: Hash.new
9+
end
10+
11+
class_methods do
12+
13+
def lambdakiq_options(options = {})
14+
self.lambdakiq_options_hash = options.symbolize_keys
15+
end
16+
17+
end
18+
19+
def lambdakiq_retry
20+
lambdakiq_options_hash[:retry]
21+
end
322

423
end
524
end

test/cases/job_test.rb

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,23 @@
11
require 'test_helper'
22

33
class JobTest < LambdakiqSpec
4+
5+
it '#active_job - a deserialize representation of what will be executed' do
6+
aj = job.active_job
7+
expect(aj).must_be_instance_of TestHelper::Jobs::BasicJob
8+
expect(aj.job_id).must_equal '527cd37e-08f4-4aa8-9834-a46220cdc5a3'
9+
expect(aj.queue_name).must_equal queue_name
10+
expect(aj.enqueued_at).must_equal '2020-11-30T13:07:36Z'
11+
expect(aj.executions).must_equal 0
12+
expect(aj.provider_job_id).must_equal '9081fe74-bc79-451f-a03a-2fe5c6e2f807'
13+
end
14+
15+
it '#active_job - executions uses ApproximateReceiveCount' do
16+
event = event_basic attributes: { ApproximateReceiveCount: '3' }
17+
aj = job(event: event).active_job
18+
expect(aj.executions).must_equal 2
19+
end
20+
421
it 'must perform basic job' do
522
Lambdakiq::Job.handler(event_basic)
623
expect(delete_message).must_be :present?
@@ -44,6 +61,7 @@ class JobTest < LambdakiqSpec
4461
end
4562

4663
it 'must delete message for failed jobs at the end of the queue/message max receive count' do
64+
# See ClientHelpers for setting queue to max receive count of 8.
4765
event = event_basic attributes: { ApproximateReceiveCount: '8' }, job_class: 'TestHelper::Jobs::ErrorJob'
4866
Lambdakiq::Job.handler(event)
4967
expect(delete_message).must_be :present?
@@ -55,6 +73,7 @@ class JobTest < LambdakiqSpec
5573
it 'must not perform and allow fifo queue to use message visibility as delay' do
5674
event = event_basic_delay minutes: 6
5775
Lambdakiq::Job.handler(event)
76+
expect(delete_message).must_be :blank?
5877
expect(change_message_visibility).must_be :present?
5978
expect(change_message_visibility_params[:visibility_timeout]).must_equal 6.minutes
6079
expect(perform_buffer_last_value).must_be_nil
@@ -64,6 +83,7 @@ class JobTest < LambdakiqSpec
6483
it 'must not perform and allow fifo queue to use message visibility as delay (using SentTimestamp)' do
6584
event = event_basic_delay minutes: 10, timestamp: 2.minutes.ago.strftime('%s%3N')
6685
Lambdakiq::Job.handler(event)
86+
expect(delete_message).must_be :blank?
6787
expect(change_message_visibility).must_be :present?
6888
expect(change_message_visibility_params[:visibility_timeout]).must_equal 8.minutes
6989
expect(perform_buffer_last_value).must_be_nil
@@ -79,4 +99,32 @@ class JobTest < LambdakiqSpec
7999
expect(logger).must_include 'Performing TestHelper::Jobs::BasicJob'
80100
expect(logger).must_include 'Performed TestHelper::Jobs::BasicJob'
81101
end
102+
103+
it 'must use `lambdakiq_options` retry options set to 0 and not retry job' do
104+
event = event_basic job_class: 'TestHelper::Jobs::ErrorJobNoRetry'
105+
Lambdakiq::Job.handler(event)
106+
expect(delete_message).must_be :present?
107+
expect(perform_buffer_last_value).must_equal 'ErrorJobNoRetry with: "test"'
108+
expect(logger).must_include 'Performing TestHelper::Jobs::ErrorJobNoRetry'
109+
expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJobNoRetry'
110+
end
111+
112+
it 'must use `lambdakiq_options` retry options set to 1 and retry job' do
113+
event = event_basic job_class: 'TestHelper::Jobs::ErrorJobOneRetry'
114+
error = expect(->{ Lambdakiq::Job.handler(event) }).must_raise 'HELL'
115+
expect(delete_message).must_be :blank?
116+
expect(perform_buffer_last_value).must_equal 'ErrorJobOneRetry with: "test"'
117+
expect(change_message_visibility).must_be :present?
118+
expect(change_message_visibility_params[:visibility_timeout]).must_equal 30.seconds
119+
expect(logger).must_include 'Performing TestHelper::Jobs::ErrorJobOneRetry'
120+
expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJobOneRetry'
121+
end
122+
123+
private
124+
125+
def job(event: event_basic)
126+
record = Lambdakiq::Event.records(event).first
127+
Lambdakiq::Job.new(record)
128+
end
129+
82130
end

test/cases/record_test.rb

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,4 @@ class RecordTest < LambdakiqSpec
3535
expect(record.receive_count).must_equal 1
3636
end
3737

38-
it '#max_receive_count?' do
39-
expect(record.max_receive_count?).must_equal false
40-
end
41-
4238
end

test/test_helper/jobs.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,5 @@
22
require 'test_helper/jobs/basic_job'
33
require 'test_helper/jobs/basic_nofifo_job'
44
require 'test_helper/jobs/error_job'
5+
require 'test_helper/jobs/error_job_no_retry'
6+
require 'test_helper/jobs/error_job_one_retry'

test/test_helper/jobs/application_job.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ module TestHelper
22
module Jobs
33
class ApplicationJob < ActiveJob::Base
44
queue_as ENV['TEST_QUEUE_NAME']
5+
include Lambdakiq::Worker
56
end
67
end
78
end
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
module TestHelper
2+
module Jobs
3+
class ErrorJobNoRetry < ApplicationJob
4+
lambdakiq_options retry: 0
5+
def perform(object)
6+
TestHelper::PerformBuffer.add "ErrorJobNoRetry with: #{object.inspect}"
7+
raise('HELL')
8+
end
9+
end
10+
end
11+
end
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
module TestHelper
2+
module Jobs
3+
class ErrorJobOneRetry < ApplicationJob
4+
lambdakiq_options retry: 1
5+
def perform(object)
6+
TestHelper::PerformBuffer.add "ErrorJobOneRetry with: #{object.inspect}"
7+
raise('HELL')
8+
end
9+
end
10+
end
11+
end

0 commit comments

Comments
 (0)