Skip to content

Commit e70546b

Browse files
committed
CloudWatch Embedded Metrics
1 parent d521c55 commit e70546b

File tree

11 files changed

+158
-59
lines changed

11 files changed

+158
-59
lines changed

lib/lambdakiq.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ def client
3232
@client ||= Client.new
3333
end
3434

35+
def config
36+
Lambdakiq::Railtie.config.lambdakiq
37+
end
38+
3539
extend self
3640

3741
end

lib/lambdakiq/job.rb

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@ def initialize(record)
2222
end
2323

2424
def job_data
25-
@job_data ||= JSON.parse(record.body)
25+
@job_data ||= JSON.parse(record.body).tap do |data|
26+
data['provider_job_id'] = record.message_id
27+
data['executions'] = record.receive_count - 1
28+
end
2629
end
2730

2831
def active_job
@@ -34,16 +37,16 @@ def queue
3437
end
3538

3639
def perform
37-
if queue.fifo? && record.fifo_delay_seconds?
38-
delay_fifo_message_visibility
39-
else
40-
ActiveJob::Base.execute(job_data)
41-
end
40+
fifo_delay? ? fifo_delay : execute
4241
delete_message
4342
rescue Exception => e
4443
perform_error(e)
4544
end
4645

46+
def execute
47+
ActiveJob::Base.execute(job_data)
48+
end
49+
4750
private
4851

4952
def client_params
@@ -79,7 +82,11 @@ def max_receive_count?
7982
record.max_receive_count? || record.receive_count >= queue.max_receive_count
8083
end
8184

82-
def delay_fifo_message_visibility
85+
def fifo_delay?
86+
queue.fifo? && record.fifo_delay_seconds?
87+
end
88+
89+
def fifo_delay
8390
params = client_params.merge visibility_timeout: record.fifo_delay_visibility_timeout
8491
client.change_message_visibility(params)
8592
end

lib/lambdakiq/metrics.rb

Lines changed: 67 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,72 @@
11
module Lambdakiq
22
class Metrics
3+
attr_reader :event
34

4-
def initialize
5-
@logger = ActiveJob::Base.logger
6-
@namespace = Rails.application.class.name.split('::').first
7-
@dimensions = Concurrent::Array.new
8-
@metrics = Concurrent::Array.new
9-
@properties = Concurrent::Hash.new
5+
class << self
6+
def log(event)
7+
new(event).log
8+
end
109
end
1110

12-
def metrics
13-
yield(self)
14-
ensure
15-
flush
11+
def initialize(event)
12+
@event = event
13+
@metrics = []
14+
@properties = {}
15+
instrument!
1616
end
1717

18-
def flush
19-
@logger.info(message) unless empty?
18+
def log
19+
logger.info JSON.dump(message)
2020
end
2121

22-
def benchmark
23-
value = nil
24-
seconds = Benchmark.realtime { value = yield }
25-
milliseconds = (seconds * 1000).to_i
26-
[value, milliseconds]
22+
private
23+
24+
def job
25+
event.payload[:job]
2726
end
2827

29-
def put_dimension(name, value)
30-
@dimensions << { name => value }
31-
self
28+
def job_name
29+
job.class.name
30+
end
31+
32+
def logger
33+
Lambdakiq.config.metrics_logger
34+
end
35+
36+
def namespace
37+
Lambdakiq.config.metrics_namespace
38+
end
39+
40+
def exception
41+
event.payload[:exception].try(:first)
42+
end
43+
44+
def dimensions
45+
[
46+
{ AppName: rails_app_name },
47+
{ JobEvent: event.name },
48+
{ JobName: job_name }
49+
]
50+
end
51+
52+
def instrument!
53+
put_metric 'Duration', event.duration.to_i, 'Milliseconds'
54+
put_metric job_name, 1, 'Count'
55+
put_metric 'Exceptions', 1, 'Count' if exception
56+
set_property 'JobId', job.job_id
57+
set_property 'JobName', job_name
58+
set_property 'QueueName', job.queue_name
59+
set_property 'MessageId', job.provider_job_id if job.provider_job_id
60+
set_property 'Exception', exception if exception
61+
set_property 'EnqueuedAt', job.enqueued_at if job.enqueued_at
62+
set_property 'Executions', job.executions if job.executions
63+
job.arguments.each_with_index do |argument, index|
64+
set_property "JobArg#{index+1}", argument
65+
end
3266
end
3367

3468
def put_metric(name, value, unit = nil)
35-
@metrics << { 'Name' => name }.tap do |m|
69+
@metrics << { 'Name': name }.tap do |m|
3670
m['Unit'] = unit if unit
3771
end
3872
set_property name, value
@@ -43,24 +77,20 @@ def set_property(name, value)
4377
self
4478
end
4579

46-
def empty?
47-
[@dimensions, @metrics, @properties].all?(&:empty?)
48-
end
49-
5080
def message
5181
{
52-
'_aws' => {
53-
'Timestamp' => timestamp,
54-
'CloudWatchMetrics' => [
82+
'_aws': {
83+
'Timestamp': timestamp,
84+
'CloudWatchMetrics': [
5585
{
56-
'Namespace' => @namespace,
57-
'Dimensions' => [@dimensions.map(&:keys).flatten],
58-
'Metrics' => @metrics
86+
'Namespace': namespace,
87+
'Dimensions': [dimensions.map(&:keys).flatten],
88+
'Metrics': @metrics
5989
}
6090
]
6191
}
6292
}.tap do |m|
63-
@dimensions.each { |dim| m.merge!(dim) }
93+
dimensions.each { |d| m.merge!(d) }
6494
m.merge!(@properties)
6595
end
6696
end
@@ -69,5 +99,10 @@ def timestamp
6999
Time.current.strftime('%s%3N').to_i
70100
end
71101

102+
def rails_app_name
103+
Rails.application.class.name.split('::').first
104+
end
105+
72106
end
73107
end
108+

lib/lambdakiq/railtie.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,18 @@ module Lambdakiq
22
class Railtie < ::Rails::Railtie
33
config.lambdakiq = ActiveSupport::OrderedOptions.new
44
config.lambdakiq.max_retries = 12
5+
config.lambdakiq.metrics_namespace = 'Lambdakiq'
6+
7+
config.after_initialize do
8+
config.active_job.logger = Rails.logger
9+
config.lambdakiq.metrics_logger = Rails.logger
10+
end
11+
12+
initializer "lambdakiq.metrics" do |app|
13+
ActiveSupport::Notifications.subscribe(/active_job/) do |*args|
14+
event = ActiveSupport::Notifications::Event.new *args
15+
Lambdakiq::Metrics.log(event)
16+
end
17+
end
518
end
619
end

lib/lambdakiq/record.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ def body
1111
data['body']
1212
end
1313

14+
def message_id
15+
data['messageId']
16+
end
17+
1418
def receipt_handle
1519
data['receiptHandle']
1620
end
@@ -47,7 +51,7 @@ def receive_count
4751
end
4852

4953
def max_receive_count?
50-
receive_count >= 12
54+
receive_count >= Lambdakiq.config.max_retries
5155
end
5256

5357
def next_visibility_timeout

test/cases/job_test.rb

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,21 @@ class JobTest < LambdakiqSpec
66
expect(delete_message).must_be :present?
77
expect(change_message_visibility).must_be_nil
88
expect(perform_buffer_last_value).must_equal 'BasicJob with: "test"'
9-
expect(active_job_log).must_include 'Performing TestHelper::Jobs::BasicJob'
10-
expect(active_job_log).must_include 'Performed TestHelper::Jobs::BasicJob'
9+
expect(logger).must_include 'Performing TestHelper::Jobs::BasicJob'
10+
expect(logger).must_include 'Performed TestHelper::Jobs::BasicJob'
11+
end
12+
13+
it 'logs cloudwatch embedded metrics' do
14+
Lambdakiq::Job.handler(event_basic)
15+
metric = logged_metric('perform.active_job')
16+
expect(metric).must_be :present?
17+
expect(metric['AppName']).must_equal 'Dummy'
18+
expect(metric['JobName']).must_equal 'TestHelper::Jobs::BasicJob'
19+
expect(metric['Duration']).must_equal 0
20+
expect(metric['JobId']).must_equal '527cd37e-08f4-4aa8-9834-a46220cdc5a3'
21+
expect(metric['QueueName']).must_equal 'lambdakiq-JobsQueue-TESTING123.fifo'
22+
expect(metric['MessageId']).must_equal '9081fe74-bc79-451f-a03a-2fe5c6e2f807'
23+
expect(metric['JobArg1']).must_equal 'test'
1124
end
1225

1326
it 'must change message visibility to next value for failed jobs' do
@@ -16,8 +29,8 @@ class JobTest < LambdakiqSpec
1629
expect(change_message_visibility).must_be :present?
1730
expect(change_message_visibility_params[:visibility_timeout]).must_equal 1416
1831
expect(perform_buffer_last_value).must_equal 'ErrorJob with: "test"'
19-
expect(active_job_log).must_include 'Performing TestHelper::Jobs::ErrorJob'
20-
expect(active_job_log).must_include 'Error performing TestHelper::Jobs::ErrorJob'
32+
expect(logger).must_include 'Performing TestHelper::Jobs::ErrorJob'
33+
expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJob'
2134
end
2235

2336
it 'wraps returned errors with no backtrace which avoids excessive/duplicate cloudwatch logging' do
@@ -26,17 +39,17 @@ class JobTest < LambdakiqSpec
2639
expect(error.class.name).must_equal 'Lambdakiq::JobError'
2740
expect(error.backtrace).must_equal []
2841
expect(perform_buffer_last_value).must_equal 'ErrorJob with: "test"'
29-
expect(active_job_log).must_include 'Performing TestHelper::Jobs::ErrorJob'
30-
expect(active_job_log).must_include 'Error performing TestHelper::Jobs::ErrorJob'
42+
expect(logger).must_include 'Performing TestHelper::Jobs::ErrorJob'
43+
expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJob'
3144
end
3245

3346
it 'must delete message for failed jobs at the end of the queue/message max receive count' do
3447
event = event_basic attributes: { ApproximateReceiveCount: '8' }, job_class: 'TestHelper::Jobs::ErrorJob'
3548
Lambdakiq::Job.handler(event)
3649
expect(delete_message).must_be :present?
3750
expect(perform_buffer_last_value).must_equal 'ErrorJob with: "test"'
38-
expect(active_job_log).must_include 'Performing TestHelper::Jobs::ErrorJob'
39-
expect(active_job_log).must_include 'Error performing TestHelper::Jobs::ErrorJob'
51+
expect(logger).must_include 'Performing TestHelper::Jobs::ErrorJob'
52+
expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJob'
4053
end
4154

4255
it 'must not perform and allow fifo queue to use message visibility as delay' do
@@ -45,7 +58,7 @@ class JobTest < LambdakiqSpec
4558
expect(change_message_visibility).must_be :present?
4659
expect(change_message_visibility_params[:visibility_timeout]).must_equal 6.minutes
4760
expect(perform_buffer_last_value).must_be_nil
48-
expect(active_job_log).must_be :blank?
61+
expect(logger).must_be :blank?
4962
end
5063

5164
it 'must not perform and allow fifo queue to use message visibility as delay (using SentTimestamp)' do
@@ -54,7 +67,7 @@ class JobTest < LambdakiqSpec
5467
expect(change_message_visibility).must_be :present?
5568
expect(change_message_visibility_params[:visibility_timeout]).must_equal 8.minutes
5669
expect(perform_buffer_last_value).must_be_nil
57-
expect(active_job_log).must_be :blank?
70+
expect(logger).must_be :blank?
5871
end
5972

6073
it 'must perform and allow fifo queue to use message visibility as delay but not when SentTimestamp is too far in the past' do
@@ -63,7 +76,7 @@ class JobTest < LambdakiqSpec
6376
expect(delete_message).must_be :present?
6477
expect(change_message_visibility).must_be_nil
6578
expect(perform_buffer_last_value).must_equal 'BasicJob with: "test"'
66-
expect(active_job_log).must_include 'Performing TestHelper::Jobs::BasicJob'
67-
expect(active_job_log).must_include 'Performed TestHelper::Jobs::BasicJob'
79+
expect(logger).must_include 'Performing TestHelper::Jobs::BasicJob'
80+
expect(logger).must_include 'Performed TestHelper::Jobs::BasicJob'
6881
end
6982
end

test/cases/record_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class RecordTest < LambdakiqSpec
2525

2626
it '#sent_at' do
2727
sent_at = record.sent_at
28-
expect(sent_at).must_be_instance_of Time
28+
expect(sent_at).must_be_instance_of ActiveSupport::TimeWithZone
2929
expect(sent_at.year).must_equal 2020
3030
expect(sent_at.month).must_equal 11
3131
expect(sent_at.day).must_equal 30

test/dummy_app/config/.keep

Whitespace-only changes.

test/dummy_app/init.rb

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
require 'rails/all'
2+
3+
module Dummy
4+
class Application < ::Rails::Application
5+
config.root = File.join __FILE__, '..'
6+
config.eager_load = true
7+
logger = ActiveSupport::Logger.new(StringIO.new)
8+
logger.formatter = ActiveSupport::Logger::SimpleFormatter.new
9+
config.logger = logger
10+
config.active_job.queue_adapter = :lambdakiq
11+
end
12+
end
13+
14+
Dummy::Application.initialize!

test/test_helper.rb

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,8 @@
1010
require 'minitest/focus'
1111
require 'mocha/minitest'
1212
Dir['test/test_helper/*.{rb}'].each { |f| require_relative "../#{f}" }
13-
14-
ActiveJob::Base.queue_adapter = :lambdakiq
1513
Lambdakiq::Client.default_options.merge! stub_responses: true
14+
require_relative './dummy_app/init'
1615

1716
class LambdakiqSpec < Minitest::Spec
1817

@@ -26,7 +25,7 @@ class LambdakiqSpec < Minitest::Spec
2625
before do
2726
client_reset!
2827
client_stub_responses
29-
reset_active_job_logger!
28+
logger_reset!
3029
perform_buffer_clear!
3130
end
3231

test/test_helper/log_helpers.rb

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,23 @@ module LogHelpers
33
extend ActiveSupport::Concern
44

55
included do
6-
let(:active_job_log) { ActiveJob::Base.logger.instance_variable_get(:@logdev).instance_variable_get(:@dev).string }
6+
let(:logger) { Rails.logger.instance_variable_get(:@logdev).instance_variable_get(:@dev).string }
77
end
88

99
private
1010

11-
def reset_active_job_logger!
12-
ActiveJob::Base.logger = Logger.new(StringIO.new)
11+
def logged_metric(event)
12+
metric = logged_metrics.reverse.detect { |l| l.include?(event) }
13+
JSON.parse(metric) if metric
1314
end
15+
16+
def logged_metrics
17+
logger.each_line.select { |l| l.include? 'CloudWatchMetrics' }
18+
end
19+
20+
def logger_reset!
21+
Rails.logger.instance_variable_get(:@logdev).instance_variable_get(:@dev).truncate 0
22+
end
23+
1424
end
1525
end

0 commit comments

Comments
 (0)