Skip to content

Commit d521c55

Browse files
committed
FIFO Queues Work With Delay. Logging/Perform Helpers.
1 parent 5723df0 commit d521c55

File tree

12 files changed

+144
-14
lines changed

12 files changed

+144
-14
lines changed

TODO.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ end
4747
Q: How do I handle job priorities?
4848
A: Use different queues.
4949

50+
* How we allow FIFO queues to work with delay using message visibility.
5051

5152
## Our Siqekiq Interfaces
5253

lib/lambdakiq/job.rb

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
module Lambdakiq
22
class Job
33

4-
attr_reader :record, :error, :sent_timestamp
4+
attr_reader :record, :error
55

66
class << self
77

@@ -33,13 +33,13 @@ def queue
3333
Lambdakiq.client.queues[active_job.queue_name]
3434
end
3535

36-
def performed?
37-
@started_at.present? && !error
38-
end
39-
4036
def perform
41-
@started_at = Time.current
42-
ActiveJob::Base.execute(job_data)
37+
if queue.fifo? && record.fifo_delay_seconds?
38+
delay_fifo_message_visibility
39+
else
40+
ActiveJob::Base.execute(job_data)
41+
end
42+
delete_message
4343
rescue Exception => e
4444
perform_error(e)
4545
end
@@ -79,5 +79,10 @@ def max_receive_count?
7979
record.max_receive_count? || record.receive_count >= queue.max_receive_count
8080
end
8181

82+
def delay_fifo_message_visibility
83+
params = client_params.merge visibility_timeout: record.fifo_delay_visibility_timeout
84+
client.change_message_visibility(params)
85+
end
86+
8287
end
8388
end

lib/lambdakiq/metrics.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def message
6666
end
6767

6868
def timestamp
69-
Time.now.strftime('%s%3N').to_i
69+
Time.current.strftime('%s%3N').to_i
7070
end
7171

7272
end

lib/lambdakiq/record.rb

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,22 @@ def attributes
2323
data['attributes']
2424
end
2525

26+
def fifo_delay_visibility_timeout
27+
fifo_delay_seconds - (Time.current - sent_at).to_i
28+
end
29+
30+
def fifo_delay_seconds
31+
data.dig('messageAttributes', 'delay_seconds', 'stringValue').try(:to_i)
32+
end
33+
34+
def fifo_delay_seconds?
35+
fifo_delay_seconds && (sent_at + fifo_delay_seconds).future?
36+
end
37+
2638
def sent_at
2739
@sent_at ||= begin
28-
ts = attributes['SentTimestamp'].to_i
29-
Time.at(ts/1000)
40+
ts = attributes['SentTimestamp'].to_i / 1000
41+
Time.zone ? Time.zone.at(ts) : Time.at(ts)
3042
end
3143
end
3244

test/cases/job_test.rb

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,69 @@
11
require 'test_helper'
22

33
class JobTest < LambdakiqSpec
4+
it 'must perform basic job' do
5+
Lambdakiq::Job.handler(event_basic)
6+
expect(delete_message).must_be :present?
7+
expect(change_message_visibility).must_be_nil
8+
expect(perform_buffer_last_value).must_equal 'BasicJob with: "test"'
9+
expect(active_job_log).must_include 'Performing TestHelper::Jobs::BasicJob'
10+
expect(active_job_log).must_include 'Performed TestHelper::Jobs::BasicJob'
11+
end
12+
413
it 'must change message visibility to next value for failed jobs' do
514
event = event_basic attributes: { ApproximateReceiveCount: '7' }, job_class: 'TestHelper::Jobs::ErrorJob'
615
expect(->{ Lambdakiq::Job.handler(event) }).must_raise 'HELL'
716
expect(change_message_visibility).must_be :present?
817
expect(change_message_visibility_params[:visibility_timeout]).must_equal 1416
18+
expect(perform_buffer_last_value).must_equal 'ErrorJob with: "test"'
19+
expect(active_job_log).must_include 'Performing TestHelper::Jobs::ErrorJob'
20+
expect(active_job_log).must_include 'Error performing TestHelper::Jobs::ErrorJob'
921
end
1022

1123
it 'wraps returned errors with no backtrace which avoids excessive/duplicate cloudwatch logging' do
1224
event = event_basic job_class: 'TestHelper::Jobs::ErrorJob'
1325
error = expect(->{ Lambdakiq::Job.handler(event) }).must_raise 'HELL'
1426
expect(error.class.name).must_equal 'Lambdakiq::JobError'
1527
expect(error.backtrace).must_equal []
28+
expect(perform_buffer_last_value).must_equal 'ErrorJob with: "test"'
29+
expect(active_job_log).must_include 'Performing TestHelper::Jobs::ErrorJob'
30+
expect(active_job_log).must_include 'Error performing TestHelper::Jobs::ErrorJob'
1631
end
1732

1833
it 'must delete message for failed jobs at the end of the queue/message max receive count' do
1934
event = event_basic attributes: { ApproximateReceiveCount: '8' }, job_class: 'TestHelper::Jobs::ErrorJob'
2035
Lambdakiq::Job.handler(event)
2136
expect(delete_message).must_be :present?
37+
expect(perform_buffer_last_value).must_equal 'ErrorJob with: "test"'
38+
expect(active_job_log).must_include 'Performing TestHelper::Jobs::ErrorJob'
39+
expect(active_job_log).must_include 'Error performing TestHelper::Jobs::ErrorJob'
40+
end
41+
42+
it 'must not perform and allow fifo queue to use message visibility as delay' do
43+
event = event_basic_delay minutes: 6
44+
Lambdakiq::Job.handler(event)
45+
expect(change_message_visibility).must_be :present?
46+
expect(change_message_visibility_params[:visibility_timeout]).must_equal 6.minutes
47+
expect(perform_buffer_last_value).must_be_nil
48+
expect(active_job_log).must_be :blank?
49+
end
50+
51+
it 'must not perform and allow fifo queue to use message visibility as delay (using SentTimestamp)' do
52+
event = event_basic_delay minutes: 10, timestamp: 2.minutes.ago.strftime('%s%3N')
53+
Lambdakiq::Job.handler(event)
54+
expect(change_message_visibility).must_be :present?
55+
expect(change_message_visibility_params[:visibility_timeout]).must_equal 8.minutes
56+
expect(perform_buffer_last_value).must_be_nil
57+
expect(active_job_log).must_be :blank?
58+
end
59+
60+
it 'must perform and allow fifo queue to use message visibility as delay but not when SentTimestamp is too far in the past' do
61+
event = event_basic_delay minutes: 2, timestamp: 3.minutes.ago.strftime('%s%3N')
62+
Lambdakiq::Job.handler(event)
63+
expect(delete_message).must_be :present?
64+
expect(change_message_visibility).must_be_nil
65+
expect(perform_buffer_last_value).must_equal 'BasicJob with: "test"'
66+
expect(active_job_log).must_include 'Performing TestHelper::Jobs::BasicJob'
67+
expect(active_job_log).must_include 'Performed TestHelper::Jobs::BasicJob'
2268
end
2369
end

test/test_helper.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,29 @@
55
Bundler.require :default, :development, :test
66
require 'rails'
77
require 'aws-sdk-sqs'
8+
require 'stringio'
89
require 'minitest/autorun'
910
require 'minitest/focus'
1011
require 'mocha/minitest'
1112
Dir['test/test_helper/*.{rb}'].each { |f| require_relative "../#{f}" }
1213

1314
ActiveJob::Base.queue_adapter = :lambdakiq
14-
ActiveJob::Base.logger = Logger.new(IO::NULL)
1515
Lambdakiq::Client.default_options.merge! stub_responses: true
1616

1717
class LambdakiqSpec < Minitest::Spec
1818

1919
include TestHelper::ClientHelpers,
2020
TestHelper::ApiRequestHelpers,
2121
TestHelper::EventHelpers,
22-
TestHelper::QueueHelpers
22+
TestHelper::QueueHelpers,
23+
TestHelper::LogHelpers,
24+
TestHelper::PerformHelpers
2325

2426
before do
2527
client_reset!
2628
client_stub_responses
29+
reset_active_job_logger!
30+
perform_buffer_clear!
2731
end
2832

2933
end

test/test_helper/event_helpers.rb

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,19 @@ def event_basic(overrides = {})
1010
Events::Basic.create(overrides)
1111
end
1212

13+
def event_basic_delay(minutes: 5, timestamp: Time.current.strftime('%s%3N'))
14+
Events::Basic.create(
15+
attributes: { SentTimestamp: timestamp },
16+
messageAttributes: {
17+
delay_seconds: {
18+
stringValue: minutes.minutes.to_s,
19+
stringListValues: [],
20+
binaryListValues: [],
21+
dataType: 'String'
22+
}
23+
}
24+
)
25+
end
26+
1327
end
1428
end

test/test_helper/jobs/basic_job.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ module TestHelper
22
module Jobs
33
class BasicJob < ApplicationJob
44
def perform(object)
5-
object
5+
TestHelper::PerformBuffer.add "BasicJob with: #{object.inspect}"
66
end
77
end
88
end

test/test_helper/jobs/basic_nofifo_job.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module Jobs
33
class BasicNofifoJob < ApplicationJob
44
queue_as ENV['TEST_QUEUE_NAME'].sub('.fifo','')
55
def perform(object)
6-
object
6+
TestHelper::PerformBuffer.add "BasicNofifoJob with: #{object.inspect}"
77
end
88
end
99
end

test/test_helper/jobs/error_job.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ module TestHelper
22
module Jobs
33
class ErrorJob < ApplicationJob
44
def perform(object)
5+
TestHelper::PerformBuffer.add "ErrorJob with: #{object.inspect}"
56
raise('HELL')
67
end
78
end

test/test_helper/log_helpers.rb

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
module TestHelper
2+
module LogHelpers
3+
extend ActiveSupport::Concern
4+
5+
included do
6+
let(:active_job_log) { ActiveJob::Base.logger.instance_variable_get(:@logdev).instance_variable_get(:@dev).string }
7+
end
8+
9+
private
10+
11+
def reset_active_job_logger!
12+
ActiveJob::Base.logger = Logger.new(StringIO.new)
13+
end
14+
end
15+
end

test/test_helper/perform_helpers.rb

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
module TestHelper
2+
module PerformBuffer
3+
def clear
4+
values.clear
5+
end
6+
7+
def add(value)
8+
values << value
9+
end
10+
11+
def values
12+
@values ||= []
13+
end
14+
15+
def last_value
16+
values.last
17+
end
18+
19+
extend self
20+
end
21+
module PerformHelpers
22+
private
23+
24+
def perform_buffer_clear!
25+
PerformBuffer.clear
26+
end
27+
28+
def perform_buffer_last_value
29+
PerformBuffer.last_value
30+
end
31+
end
32+
end

0 commit comments

Comments
 (0)