diff --git a/Gemfile.lock b/Gemfile.lock index be2f065..875c387 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -4,10 +4,25 @@ PATH lambdakiq (1.0.0) activejob aws-sdk-sqs + concurrent-ruby + railties GEM remote: https://rubygems.org/ specs: + actionpack (6.0.3.4) + actionview (= 6.0.3.4) + activesupport (= 6.0.3.4) + rack (~> 2.0, >= 2.0.8) + rack-test (>= 0.6.3) + rails-dom-testing (~> 2.0) + rails-html-sanitizer (~> 1.0, >= 1.2.0) + actionview (6.0.3.4) + activesupport (= 6.0.3.4) + builder (~> 3.1) + erubi (~> 1.4) + rails-dom-testing (~> 2.0) + rails-html-sanitizer (~> 1.1, >= 1.2.0) activejob (6.0.3.4) activesupport (= 6.0.3.4) globalid (>= 0.3.6) @@ -29,25 +44,54 @@ GEM aws-sigv4 (~> 1.1) aws-sigv4 (1.2.2) aws-eventstream (~> 1, >= 1.0.2) + builder (3.2.4) coderay (1.1.3) concurrent-ruby (1.1.7) + crass (1.0.6) + erubi (1.10.0) globalid (0.4.2) activesupport (>= 4.2.0) i18n (1.8.5) concurrent-ruby (~> 1.0) jmespath (1.4.0) + loofah (2.8.0) + crass (~> 1.0.2) + nokogiri (>= 1.5.9) + macaddr (1.7.2) + systemu (~> 2.6.5) method_source (1.0.0) + mini_portile2 (2.4.0) minitest (5.14.2) minitest-focus (1.2.1) minitest (>= 4, < 6) mocha (1.11.2) + nokogiri (1.10.10) + mini_portile2 (~> 2.4.0) pry (0.13.1) coderay (~> 1.1) method_source (~> 1.0) + rack (2.2.3) + rack-test (1.1.0) + rack (>= 1.0, < 3) + rails-dom-testing (2.0.3) + activesupport (>= 4.2.0) + nokogiri (>= 1.6) + rails-html-sanitizer (1.3.0) + loofah (~> 2.3) + railties (6.0.3.4) + actionpack (= 6.0.3.4) + activesupport (= 6.0.3.4) + method_source + rake (>= 0.8.7) + thor (>= 0.20.3, < 2.0) rake (13.0.1) + systemu (2.6.5) + thor (1.0.1) thread_safe (0.3.6) tzinfo (1.2.8) thread_safe (~> 0.1) + uuid (2.3.9) + macaddr (~> 1.0) zeitwerk (2.4.2) PLATFORMS @@ -61,6 +105,7 @@ DEPENDENCIES mocha pry rake + uuid BUNDLED WITH 2.1.4 diff --git a/README.md b/README.md index b877de1..ac7f8fa 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,247 @@ +![Lambdakiq: ActiveJob on SQS & Lambda](images/Lambdakiq.png) + # Lambdakiq -TODO ... +Lamby: Simple Rails & AWS Lambda Integration using Rack.A drop-in replacement for [Sidekiq](https://github.com/mperham/sidekiq) when running Rails in AWS Lambda using the [Lamby](https://lamby.custominktech.com) gem. + +Lambdakiq allows you to leverage AWS' managed infrastructure to the fullest extent. Gone are the days of managing pods and long polling processes. Instead AWS delivers messages directly to your Rails' job functions and scales it up and down as needed. Observability is built in using AWS CloudWatch Metrics, Dashboards, and Alarms. Learn more about [Using AWS Lambda with Amazon SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html) or get started now. + +## Key Features + +* Distinct web & jobs Lambda functions. +* AWS fully managed polling. Event-driven. +* Maximum 12 retries. Per job configurable. +* Mirror Sidekiq's retry [backoff](https://github.com/mperham/sidekiq/wiki/Error-Handling#automatic-job-retry) timing. +* Last retry is at 11 hours 30 minutes. +* Supports ActiveJob's wait/delay. Up to 15 minutes. +* Dead messages are stored for up to 14 days. + +## Project Setup + +This gem assumes your Rails application is on AWS Lambda, ideally with our [Lamby](https://lamby.custominktech.com) gem. It could be using Lambda's traditional zip package type or the newer [container](https://dev.to/aws-heroes/lambda-containers-with-rails-a-perfect-match-4lgb) format. If Rails on Lambda is new to you, consider following our [quick start](https://lamby.custominktech.com/docs/quick_start) guide to get your first application up and running. From there, to use Lambdakiq, here are steps to setup your project + + +### Bundle & Config + +Add the Lambdakiq gem to your `Gemfile`. + +```ruby +gem 'lambdakiq' +``` + +Open `config/initializers/production.rb` and set Lambdakiq as your ActiveJob queue adapter. + +```ruby +config.active_job.queue_adapter = :lambdakiq +``` + +Open `app/jobs/application_job.rb` and add our worker module. The queue name will be set by an environment using CloudFormation further down. + +```ruby +class ApplicationJob < ActiveJob::Base + include Lambdakiq::Worker + queue_as ENV['JOBS_QUEUE_NAME'] +end +``` + +### SQS Resources + +Open up your project's SAM [`template.yaml`](https://lamby.custominktech.com/docs/anatomy#file-template-yaml) file and make the following additions and changes. First, we need to create your [SQS queues](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-sqs-queues.html) under the `Resources` section. + +```yaml +JobsQueue: + Type: AWS::SQS::Queue + Properties: + ReceiveMessageWaitTimeSeconds: 10 + RedrivePolicy: + deadLetterTargetArn: !GetAtt JobsDLQueue.Arn + maxReceiveCount: 13 + VisibilityTimeout: 301 + +JobsDLQueue: + Type: AWS::SQS::Queue + Properties: + MessageRetentionPeriod: 1209600 +``` + +In this example above we are also creating a queue to automatically handle our redrives and storage for any dead messages. We use [long polling](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html#sqs-long-polling) to receive messages for lower costs. In most cases your message is consumed almost immediately. Sidekiq polling is around 10s too. + +The max receive count is 13 which means you get 12 retries. This is done so we can mimic Sidekiq's [automatic retry and backoff](https://github.com/mperham/sidekiq/wiki/Error-Handling#automatic-job-retry). The dead letter queue retains messages for the maximum of 14 days. This can be changed as needed. We also make no assumptions on how you want to handle dead jobs. + +### Queue Name Environment Variable + +We need to pass the newly created queue's name as an environment variable to your soon to be created jobs function. Since it is common for your Rails web and jobs functions to share these, we can leverage [SAM's Globals](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-specification-template-anatomy-globals.html) section. + +```yaml +Globals: + Function: + Environment: + Variables: + RAILS_ENV: !Ref RailsEnv + JOBS_QUEUE_NAME: !GetAtt JobsQueue.QueueName +``` + +We can remove the `Environment` section from our web function and all functions in this stack will now use the globals. Here we are using an [intrinsic function](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/intrinsic-function-reference-getatt.html) to pass the queue's name as the `JOBS_QUEUE_NAME` environment variable. + +### IAM Permissions + +Both functions will need capabilities to access the SQS jobs queue. We can add or extend the [SAM Policies](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-resource-function.html#sam-function-policies) section of our `RailsLambda` web function so it (and our soon to be created jobs function) have full capabilities to this new queue. + +```yaml +Policies: + - Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - sqs:* + Resource: + - !Sub arn:aws:sqs:${AWS::Region}:${AWS::AccountId}:${JobsQueue.QueueName} +``` + +Now we can duplicate our `RailsLambda` resource YAML (except for the `Events` property) to a new `JobsLambda` one. This gives us a distinct Lambda function to process jobs whose events, memory, timeout, and more can be independently tuned. However, both the `web` and `jobs` functions will use the same ECR container image! + +```yaml +JobsLambda: + Type: AWS::Serverless::Function + Metadata: + DockerContext: ./.lamby/RailsLambda + Dockerfile: Dockerfile + DockerTag: jobs + Properties: + Events: + SQSJobs: + Type: SQS + Properties: + Queue: !GetAtt JobsQueue.Arn + BatchSize: 1 + MemorySize: 1792 + PackageType: Image + Policies: + - Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - sqs:* + Resource: + - !Sub arn:aws:sqs:${AWS::Region}:${AWS::AccountId}:${JobsQueue.QueueName} + Timeout: 300 +``` + +Here are some key aspects of our `JobsLambda` resource above: + +* The `Events` property uses the [SQS Type](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-property-function-sqs.html). +* Our [BatchSize](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-property-function-sqs.html#sam-function-sqs-batchsize) is set to one so we can handle retrys more easily without worrying about idempotency in larger batches. +* The `Metadata`'s Docker properties must be the same as our web function except for the `DockerTag`. This is needed for the image to be shared. This works around a known [SAM issue](https://github.com/aws/aws-sam-cli/issues/2466) vs using the `ImageConfig` property. +* The jobs function `Timeout` must be lower than the `JobsQueue`'s `VisibilityTimeout` property. When the batch size is one, the queue's visibility is generally one second more. + +🎉 Deploy your application and have fun with ActiveJob on SQS & Lambda. + +## Configuration + +Most general Lambdakiq configuration options are exposed via the Rails standard configuration method. + +### Rails Configs + +```ruby +config.lambdakiq +``` + +* `max_retries=` - Retries for all jobs. Default is the Lambdakiq maximum of `12`. +* `metrics_namespace=` - The CloudWatch Embedded Metrics namespace. Default is `Lambdakiq`. +* `metrics_logger=` - Set to the Rails logger which is STDOUT via Lamby/Lambda. + +### ActiveJob Configs + +You can also set configuration options on a per job basis using the `lambdakiq_options` method. + +```ruby +class OrderProcessorJob < ApplicationJob + lambdakiq_options retry: 2 +end +``` + +* `retry` - Overrides the default Lambdakiq `max_retries` for this one job. + +## Concurrency & Limits + +AWS SQS is highly scalable with [few limits](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-quotas.html). As your jobs in SQS increases so should your concurrent functions to process that work. However, as this article, ["Why isn't my Lambda function with an Amazon SQS event source scaling optimally?"](https://aws.amazon.com/premiumsupport/knowledge-center/lambda-sqs-scaling/) describes it is possible that errors will effect your concurrency. + +To help keep your queue and workers scalable, reduce the errors raised by your jobs. You an also reduce the retry count. + +## Observability with CloudWatch + +Get ready to gain way more insights into your ActiveJobs using AWS' [CloudWatch](https://aws.amazon.com/cloudwatch/) service. Every AWS service, including SQS & Lambda, publishes detailed [CloudWatch Metrics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html). This gem leverages [CloudWatch Embedded Metrics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format.html) to add detailed ActiveJob metrics to that system. You can mix and match these data points to build your own [CloudWatch Dashboards](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Dashboards.html). If needed, any combination can be used to trigger [CloudWatch Alarms](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/AlarmThatSendsEmail.html). Much like Sumo Logic, you can search & query for data using [CloudWatch Logs Insights](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/AnalyzingLogData.html). + +![CloudWatch Dashboard](https://user-images.githubusercontent.com/2381/106465990-be7a6200-6468-11eb-8461-93db0046cda5.png) + +Metrics are published under the `Lambdakiq` namespace. This is configurable using `config.lambdakiq.metrics_namespace` but should not be needed since all metrics are published using these three dimensions which allow you to easily segment metrics/dashboards to a specific application. + +### Metric Dimensions + +* `AppName` - This is the name of your Rails application. Ex: `MyApp` +* `JobEvent` - Name of the ActiveSupport Notification. Ex: `*.active_job`. +* `JobName` - The class name of the ActiveSupport job. Ex: `NotificationJob` + +### ActiveJob Event Names +For reference, here are the `JobEvent` names published by ActiveSupport. A few of these are instrumented by Lambdakiq since we use custom retry logic like Sidekiq. These event/metrics are found in the Rails application CloudWatch logs because they publish/enqueue jobs. + +* `enqueue.active_job` +* `enqueue_at.active_job` + +While these event/metrics can be found in the jobs function's log. + +* `perform_start.active_job` +* `perform.active_job` +* `enqueue_retry.active_job` +* `retry_stopped.active_job` + +### Metric Properties + +These are the properties published with each metric. Remember, properties can not be used as metric data in charts but can be searched using [CloudWatch Logs Insights](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/AnalyzingLogData.html). + +* `JobId` - ActiveJob Unique ID. Ex: `9f3b6977-6afc-4769-aed6-bab1ad9a0df5` +* `QueueName` - SQS Queue Name. Ex: `myapp-JobsQueue-14F18LG6XFUW5.fifo` +* `MessageId` - SQS Message ID. Ex: `5653246d-dc5e-4c95-9583-b6b83ec78602` +* `ExceptionName` - Class name of error raised. Present in perform and retry events. +* `EnqueuedAt` - When ActiveJob enqueued the message. Ex: `2021-01-14T01:43:38Z` +* `Executions` - The number of current executions. Counts from `1` and up. +* `JobArg#{n}` - Enumerated serialized arguments. + +### Metric Data + +And finally, here are the metrics which each dimension can chart using [CloudWatch Metrics & Dashboards](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Dashboards.html). + +* `Duration` - Of the job event in milliseconds. +* `Count` - Of the event. +* `ExceptionCount` - Of the event. Useful with `ExceptionName`. + +### CloudWatch Dashboard Examples + +Please share how you are using CloudWatch to monitor and/or alert on your ActiveJobs with Lambdakiq! + +💬 https://github.com/customink/lambdakiq/discussions/3 + + +## Common Questions + +**Are Scheduled Jobs Supported?** - No. If you need a scheduled job please use the [SAM Schedule](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-property-function-schedule.html) event source which invokes your function with an [Eventbridege AWS::Events::Rule](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-events-rule.html). + +**Are FIFO Queues Supported?** - Yes. When you create your [AWS::SQS::Queue](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-sqs-queues.html) resources you can set the [FifoQueue](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-sqs-queues.html#aws-sqs-queue-fifoqueue) property to `true`. Remember that both your jobs queue and the redrive queue must be the same. When using FIFO we: + +* Simulate `delay_seconds` for ActiveJob's wait by using visibility timeouts under the hood. We still cap it to non-FIFO's 15 minutes. +* Set both the messages `message_group_id` and `message_deduplication_id` to the unique job id provided by ActiveJob. + +**Can I Use Multiple Queues?** - Yes. Nothing is stopping you from creating any number of queues and/or functions to process them. Your subclasses can use ActiveJob's `queue_as` method as needed. This is an easy way to handle job priorities too. ```ruby -# TODO ... +class SomeLowPriorityJob < ApplicationJob + queue_as ENV['BULK_QUEUE_NAME'] +end ``` +**What Is The Max Message Size?** - 256KB. ActiveJob messages should be small however since Rails uses the [GlobalID](https://github.com/rails/globalid) gem to avoid marshaling large data structures to jobs. + ## Contributing After checking out the repo, run: diff --git a/Rakefile b/Rakefile index 76f2c87..81933a9 100644 --- a/Rakefile +++ b/Rakefile @@ -5,7 +5,7 @@ require "rake/testtask" Rake::TestTask.new(:test) do |t| t.libs << "test" t.libs << "lib" - t.test_files = FileList["test/**/*_test.rb"] + t.test_files = FileList["test/cases/**/*_test.rb"] t.verbose = false t.warning = false end diff --git a/bin/_console b/bin/_console new file mode 100755 index 0000000..36a4d09 --- /dev/null +++ b/bin/_console @@ -0,0 +1,14 @@ +#!/usr/bin/env ruby + +require "bundler/setup" +require "lambdakiq" + +# You can add fixtures and/or initialization code here to make experimenting +# with your gem easier. You can also use a different console, if you like. + +# (If you use this, don't forget to add pry to your Gemfile!) +# require "pry" +# Pry.start + +require "irb" +IRB.start(__FILE__) diff --git a/bin/console b/bin/console index 36a4d09..d577a66 100755 --- a/bin/console +++ b/bin/console @@ -1,14 +1,6 @@ -#!/usr/bin/env ruby +#!/bin/bash +set -e -require "bundler/setup" -require "lambdakiq" - -# You can add fixtures and/or initialization code here to make experimenting -# with your gem easier. You can also use a different console, if you like. - -# (If you use this, don't forget to add pry to your Gemfile!) -# require "pry" -# Pry.start - -require "irb" -IRB.start(__FILE__) +docker-compose run \ + lambdakiqgem \ + ./bin/_console diff --git a/images/Lambdakiq.png b/images/Lambdakiq.png new file mode 100644 index 0000000..0cedca0 Binary files /dev/null and b/images/Lambdakiq.png differ diff --git a/images/Lambdakiq.sketch b/images/Lambdakiq.sketch new file mode 100644 index 0000000..a7d7c37 Binary files /dev/null and b/images/Lambdakiq.sketch differ diff --git a/lamby.gemspec b/lambdakiq.gemspec similarity index 90% rename from lamby.gemspec rename to lambdakiq.gemspec index 14652cc..63fe1cb 100644 --- a/lamby.gemspec +++ b/lambdakiq.gemspec @@ -12,17 +12,20 @@ Gem::Specification.new do |spec| spec.homepage = "https://github.com/customink/lambdakiq" spec.license = "MIT" spec.files = Dir.chdir(File.expand_path('..', __FILE__)) do - `git ls-files -z`.split("\x0").reject { |f| f.match(%r{^(test|spec|features)/}) } + `git ls-files -z`.split("\x0").reject { |f| f.match(%r{^(test|images)/}) } end spec.bindir = "exe" spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) } spec.require_paths = ["lib"] spec.add_dependency 'activejob' spec.add_dependency 'aws-sdk-sqs' + spec.add_dependency 'concurrent-ruby' + spec.add_dependency 'railties' spec.add_development_dependency 'bundler' spec.add_development_dependency 'rake' spec.add_development_dependency 'minitest' spec.add_development_dependency 'minitest-focus' spec.add_development_dependency 'mocha' spec.add_development_dependency 'pry' + spec.add_development_dependency 'uuid' end diff --git a/lib/lambdakiq.rb b/lib/lambdakiq.rb index 52e7f4d..518e926 100644 --- a/lib/lambdakiq.rb +++ b/lib/lambdakiq.rb @@ -1,16 +1,41 @@ +require 'json' +require 'digest' require 'active_job' -require 'aws-sdk-sqs' +require 'active_job/queue_adapters' +require 'active_support/all' require 'lambdakiq/version' - -# if defined?(Rails) -# require 'rails/railtie' -# require 'lambdakiq/railtie' -# end +require 'lambdakiq/error' +require 'lambdakiq/adapter' +require 'lambdakiq/client' +require 'lambdakiq/queue' +require 'lambdakiq/message' +require 'lambdakiq/event' +require 'lambdakiq/job' +require 'lambdakiq/record' +require 'lambdakiq/backoff' +require 'lambdakiq/metrics' +require 'lambdakiq/worker' +require 'rails/railtie' +require 'lambdakiq/railtie' module Lambdakiq - extend self + def handler(event) + Job.handler(event) + end + + def jobs?(event) + Event.jobs?(event) + end - # autoload :Xyz, 'lambdakiq/xyz' + def client + @client ||= Client.new + end + + def config + Lambdakiq::Railtie.config.lambdakiq + end + + extend self end diff --git a/lib/lambdakiq/adapter.rb b/lib/lambdakiq/adapter.rb new file mode 100644 index 0000000..6f20616 --- /dev/null +++ b/lib/lambdakiq/adapter.rb @@ -0,0 +1,38 @@ +module ActiveJob + module QueueAdapters + class LambdakiqAdapter + + def enqueue(job, options = {}) + job.lambdakiq_async? ? _enqueue_async(job, options) : _enqueue(job, options) + end + + def enqueue_at(job, timestamp) + enqueue job, delay_seconds: delay_seconds(timestamp) + end + + private + + def delay_seconds(timestamp) + ds = (timestamp - Time.current.to_i).to_i + [ds, 900].min + end + + def _enqueue(job, options = {}) + queue = Lambdakiq.client.queues[job.queue_name] + queue.send_message job, options + end + + def _enqueue_async(job, options = {}) + Concurrent::Promise + .execute { _enqueue(job, options) } + .on_error { |e| async_enqueue_error(e) } + end + + def async_enqueue_error(e) + msg = "[Lambdakiq] Failed to queue job #{job}. Reason: #{e}" + Rails.logger.error(msg) + end + + end + end +end diff --git a/lib/lambdakiq/backoff.rb b/lib/lambdakiq/backoff.rb new file mode 100644 index 0000000..547d493 --- /dev/null +++ b/lib/lambdakiq/backoff.rb @@ -0,0 +1,40 @@ +module Lambdakiq + class Backoff + + MAX_VISIBILITY_TIMEOUT = 43200 # 12 Hours + + attr_reader :count + + class << self + + def backoff(count) + new(count).backoff + end + + end + + def initialize(count) + @count = count + end + + # From Sidekiq: https://git.io/fhi5O + # + def backoff + case count + when 1 then 30 + when 2 then 46 + when 3 then 76 + when 4 then 156 + when 5 then 346 + when 6 then 730 + when 7 then 1416 + when 8 then 2536 + when 9 then 4246 + when 10 then 6726 + when 11 then 10180 + when 12 then 14836 + end + end + + end +end diff --git a/lib/lambdakiq/client.rb b/lib/lambdakiq/client.rb new file mode 100644 index 0000000..d11d411 --- /dev/null +++ b/lib/lambdakiq/client.rb @@ -0,0 +1,37 @@ +module Lambdakiq + class Client + + class_attribute :default_options, + instance_writer: false, + instance_predicate: false, + default: Hash.new + + attr_reader :queues + + def initialize + @queues = Hash.new do |h, name| + h[name] = Queue.new(name) + end + end + + def sqs + @sqs ||= begin + require 'aws-sdk-sqs' + Aws::SQS::Client.new(options) + end + end + + private + + def options + default_options.tap do |opts| + opts[:region] ||= region if region + end + end + + def region + ENV['AWS_REGION'] + end + + end +end diff --git a/lib/lambdakiq/error.rb b/lib/lambdakiq/error.rb new file mode 100644 index 0000000..2c0179b --- /dev/null +++ b/lib/lambdakiq/error.rb @@ -0,0 +1,21 @@ +module Lambdakiq + class Error < StandardError + end + + class JobError < Error + attr_reader :original_exception, :job + + def initialize(error) + @original_exception = error + super(error.message) + set_backtrace Rails.backtrace_cleaner.clean(error.backtrace) + end + end + + class FifoDelayError < Error + def initialize(error) + super + set_backtrace([]) + end + end +end diff --git a/lib/lambdakiq/event.rb b/lib/lambdakiq/event.rb new file mode 100644 index 0000000..3947c74 --- /dev/null +++ b/lib/lambdakiq/event.rb @@ -0,0 +1,19 @@ +module Lambdakiq + module Event + + def jobs?(event) + records(event).any? { |r| job?(r) } + end + + def job?(record) + record.dig('messageAttributes', 'lambdakiq', 'stringValue') == '1' + end + + def records(event) + event['Records'] || [] + end + + extend self + + end +end diff --git a/lib/lambdakiq/job.rb b/lib/lambdakiq/job.rb new file mode 100644 index 0000000..7b4644a --- /dev/null +++ b/lib/lambdakiq/job.rb @@ -0,0 +1,120 @@ +module Lambdakiq + class Job + + attr_reader :record, :error + + class << self + + def handler(event) + records = Event.records(event) + jobs = records.map { |record| new(record) } + jobs.each(&:perform) + jwerror = jobs.detect{ |j| j.error } + return unless jwerror + raise JobError.new(jwerror.error) + end + + end + + def initialize(record) + @record = Record.new(record) + @error = false + end + + def job_data + @job_data ||= JSON.parse(record.body).tap do |data| + data['provider_job_id'] = record.message_id + data['executions'] = record.receive_count - 1 + end + end + + def active_job + @active_job ||= ActiveJob::Base.deserialize(job_data) + end + + def queue + Lambdakiq.client.queues[active_job.queue_name] + end + + def executions + active_job.executions + end + + def perform + if fifo_delay? + fifo_delay + raise FifoDelayError, active_job.job_id + end + execute + end + + def execute + ActiveJob::Base.execute(job_data) + delete_message + rescue Exception => e + increment_executions + perform_error(e) + end + + private + + def client_params + { queue_url: queue.queue_url, receipt_handle: record.receipt_handle } + end + + def perform_error(e) + if change_message_visibility + instrument :enqueue_retry, error: e, wait: record.next_visibility_timeout + @error = e + else + instrument :retry_stopped, error: e + delete_message + end + end + + def delete_message + client.delete_message(client_params) + rescue Exception => e + true + end + + def change_message_visibility + return false if max_receive_count? + params = client_params.merge visibility_timeout: record.next_visibility_timeout + client.change_message_visibility(params) + true + end + + def client + Lambdakiq.client.sqs + end + + def max_receive_count? + executions > retry_limit + end + + def retry_limit + config_retry = [Lambdakiq.config.max_retries, 12].min + [ (active_job.lambdakiq_retry || config_retry), + (queue.max_receive_count - 1) ].min + end + + def fifo_delay? + queue.fifo? && record.fifo_delay_seconds? + end + + def fifo_delay + params = client_params.merge visibility_timeout: record.fifo_delay_visibility_timeout + client.change_message_visibility(params) + end + + def increment_executions + active_job.executions = active_job.executions + 1 + end + + def instrument(name, error: nil, wait: nil) + active_job.send :instrument, name, error: error, wait: wait + end + + end +end diff --git a/lib/lambdakiq/message.rb b/lib/lambdakiq/message.rb new file mode 100644 index 0000000..fd48cfd --- /dev/null +++ b/lib/lambdakiq/message.rb @@ -0,0 +1,67 @@ +module Lambdakiq + class Message + LAMBDAKIQ_ATTRIBUTE = { 'lambdakiq' => { string_value: '1', data_type: 'String' } }.freeze + + attr_reader :queue, :job, :options + + def initialize(queue, job, options = {}) + @queue = queue + @job = job + @options = options + end + + def params + message_params.merge(message_options) + end + + private + + def message_params + { message_body: message_body, + message_attributes: message_attributes } + .merge(message_params_fifo) + end + + def message_options + if queue.fifo? + options.except(:delay_seconds) + else + options + end + end + + def message_body + JSON.dump(job.serialize) + end + + def message_params_fifo + if queue.fifo? + { message_group_id: job.job_id, + message_deduplication_id: job.job_id } + else + {} + end + end + + def message_attributes + LAMBDAKIQ_ATTRIBUTE.merge(delay_seconds_attribute) + end + + def delay_seconds + options[:delay_seconds] || 0 + end + + def delay_seconds? + !delay_seconds.zero? + end + + def delay_seconds_attribute + if queue.fifo? && delay_seconds? + { 'delay_seconds' => { string_value: delay_seconds.to_s, data_type: 'String' } } + else + {} + end + end + + end +end diff --git a/lib/lambdakiq/metrics.rb b/lib/lambdakiq/metrics.rb new file mode 100644 index 0000000..9d1c8aa --- /dev/null +++ b/lib/lambdakiq/metrics.rb @@ -0,0 +1,110 @@ +module Lambdakiq + class Metrics + attr_reader :event + + class << self + def log(event) + new(event).log + end + end + + def initialize(event) + @event = event + @metrics = [] + @properties = {} + instrument! + end + + def log + logger.info JSON.dump(message) + end + + private + + def job + event.payload[:job] + end + + def job_name + job.class.name + end + + def logger + Lambdakiq.config.metrics_logger + end + + def namespace + Lambdakiq.config.metrics_namespace + end + + def exception_name + event.payload[:exception].try(:first) || + event.payload[:error]&.class&.name + end + + def dimensions + [ + { AppName: rails_app_name }, + { JobEvent: event.name }, + { JobName: job_name } + ] + end + + def instrument! + put_metric 'Duration', event.duration.to_i, 'Milliseconds' + put_metric 'Count', 1, 'Count' + put_metric 'ExceptionCount', 1, 'Count' if exception_name + set_property 'JobId', job.job_id + set_property 'JobName', job_name + set_property 'QueueName', job.queue_name + set_property 'MessageId', job.provider_job_id if job.provider_job_id + set_property 'ExceptionName', exception_name if exception_name + set_property 'EnqueuedAt', job.enqueued_at if job.enqueued_at + set_property 'Executions', job.executions if job.executions + job.arguments.each_with_index do |argument, index| + set_property "JobArg#{index+1}", argument + end + end + + def put_metric(name, value, unit = nil) + @metrics << { 'Name': name }.tap do |m| + m['Unit'] = unit if unit + end + set_property name, value + end + + def set_property(name, value) + @properties[name] = value + self + end + + def message + { + '_aws': { + 'Timestamp': timestamp, + 'CloudWatchMetrics': [ + { + 'Namespace': namespace, + 'Dimensions': [dimensions.map(&:keys).flatten], + 'Metrics': @metrics + } + ] + } + }.tap do |m| + dimensions.each { |d| m.merge!(d) } + m.merge!(@properties) + end + end + + def timestamp + Time.current.strftime('%s%3N').to_i + end + + def rails_app_name + Lambdakiq.config.metrics_app_name || + Rails.application.class.name.split('::').first + end + + end +end + diff --git a/lib/lambdakiq/queue.rb b/lib/lambdakiq/queue.rb new file mode 100644 index 0000000..d1968ae --- /dev/null +++ b/lib/lambdakiq/queue.rb @@ -0,0 +1,55 @@ +module Lambdakiq + class Queue + + attr_reader :queue_name, + :queue_url + + def initialize(queue_name) + @queue_name = queue_name + @queue_url = get_queue_url + attributes + end + + def send_message(job, options = {}) + client.send_message send_message_params(job, options) + end + + def attributes + @attributes ||= client.get_queue_attributes({ + queue_url: queue_url, + attribute_names: ['All'] + }).attributes + end + + def redrive_policy + @redrive_policy ||= JSON.parse(attributes['RedrivePolicy']) + end + + def max_receive_count + redrive_policy['maxReceiveCount'].to_i + end + + def fifo? + queue_name.ends_with?('.fifo') + end + + private + + def client + Lambdakiq.client.sqs + end + + def get_queue_url + client.get_queue_url(queue_name: queue_name).queue_url + end + + def send_message_params(job, options) + { queue_url: queue_url }.merge(message_params(job, options)) + end + + def message_params(job, options) + Message.new(self, job, options).params + end + + end +end diff --git a/lib/lambdakiq/railtie.rb b/lib/lambdakiq/railtie.rb index b8636d4..667c340 100644 --- a/lib/lambdakiq/railtie.rb +++ b/lib/lambdakiq/railtie.rb @@ -1,5 +1,19 @@ module Lambdakiq class Railtie < ::Rails::Railtie config.lambdakiq = ActiveSupport::OrderedOptions.new + config.lambdakiq.max_retries = 12 + config.lambdakiq.metrics_namespace = 'Lambdakiq' + + config.after_initialize do + config.active_job.logger = Rails.logger + config.lambdakiq.metrics_logger = Rails.logger + end + + initializer "lambdakiq.metrics" do |app| + ActiveSupport::Notifications.subscribe(/active_job/) do |*args| + event = ActiveSupport::Notifications::Event.new *args + Lambdakiq::Metrics.log(event) + end + end end end diff --git a/lib/lambdakiq/record.rb b/lib/lambdakiq/record.rb new file mode 100644 index 0000000..82e85e3 --- /dev/null +++ b/lib/lambdakiq/record.rb @@ -0,0 +1,58 @@ +module Lambdakiq + class Record + + attr_reader :data + + def initialize(data) + @data = data + end + + def body + data['body'] + end + + def message_id + data['messageId'] + end + + def receipt_handle + data['receiptHandle'] + end + + def queue_name + @queue_name ||= data['eventSourceARN'].split(':').last + end + + def attributes + data['attributes'] + end + + def fifo_delay_visibility_timeout + fifo_delay_seconds - (Time.current - sent_at).to_i + end + + def fifo_delay_seconds + data.dig('messageAttributes', 'delay_seconds', 'stringValue').try(:to_i) + end + + def fifo_delay_seconds? + fifo_delay_seconds && (sent_at + fifo_delay_seconds).future? + end + + def sent_at + @sent_at ||= begin + ts = attributes['SentTimestamp'].to_i / 1000 + Time.zone ? Time.zone.at(ts) : Time.at(ts) + end + end + + def receive_count + @receive_count ||= attributes['ApproximateReceiveCount'].to_i + end + + def next_visibility_timeout + @next_visibility_timeout ||= Backoff.backoff(receive_count) + end + + end +end diff --git a/lib/lambdakiq/version.rb b/lib/lambdakiq/version.rb index 386ef2b..b266797 100644 --- a/lib/lambdakiq/version.rb +++ b/lib/lambdakiq/version.rb @@ -1,3 +1,3 @@ module Lambdakiq - VERSION = '0.0.1' + VERSION = '1.0.0' end diff --git a/lib/lambdakiq/worker.rb b/lib/lambdakiq/worker.rb new file mode 100644 index 0000000..5f3f739 --- /dev/null +++ b/lib/lambdakiq/worker.rb @@ -0,0 +1,28 @@ +module Lambdakiq + module Worker + extend ActiveSupport::Concern + + included do + class_attribute :lambdakiq_options_hash, + instance_predicate: false, + default: Hash.new + end + + class_methods do + + def lambdakiq_options(options = {}) + self.lambdakiq_options_hash = options.symbolize_keys + end + + end + + def lambdakiq_retry + lambdakiq_options_hash[:retry] + end + + def lambdakiq_async? + !!lambdakiq_options_hash[:async] + end + + end +end diff --git a/test/cases/event_test.rb b/test/cases/event_test.rb new file mode 100644 index 0000000..a1ade77 --- /dev/null +++ b/test/cases/event_test.rb @@ -0,0 +1,16 @@ +require 'test_helper' + +class EventTest < LambdakiqSpec + it '.records' do + event = event_basic + records = Lambdakiq::Event.records(event) + expect(records).must_be_instance_of(Array) + expect(records.length).must_equal(1) + end + + it '.jobs?' do + event = event_basic + jobs = Lambdakiq::Event.jobs?(event) + expect(jobs).must_equal(true) + end +end diff --git a/test/cases/job_test.rb b/test/cases/job_test.rb new file mode 100644 index 0000000..7a52cb5 --- /dev/null +++ b/test/cases/job_test.rb @@ -0,0 +1,141 @@ +require 'test_helper' + +class JobTest < LambdakiqSpec + + it '#active_job - a deserialize representation of what will be executed' do + aj = job.active_job + expect(aj).must_be_instance_of TestHelper::Jobs::BasicJob + expect(aj.job_id).must_equal '527cd37e-08f4-4aa8-9834-a46220cdc5a3' + expect(aj.queue_name).must_equal queue_name + expect(aj.enqueued_at).must_equal '2020-11-30T13:07:36Z' + expect(aj.executions).must_equal 0 + expect(aj.provider_job_id).must_equal '9081fe74-bc79-451f-a03a-2fe5c6e2f807' + end + + it '#active_job - executions uses ApproximateReceiveCount' do + event = event_basic attributes: { ApproximateReceiveCount: '3' } + aj = job(event: event).active_job + expect(aj.executions).must_equal 2 + end + + it 'must perform basic job' do + Lambdakiq::Job.handler(event_basic) + expect(delete_message).must_be :present? + expect(change_message_visibility).must_be_nil + expect(perform_buffer_last_value).must_equal 'BasicJob with: "test"' + expect(logger).must_include 'Performing TestHelper::Jobs::BasicJob' + expect(logger).must_include 'Performed TestHelper::Jobs::BasicJob' + end + + it 'logs cloudwatch embedded metrics' do + Lambdakiq::Job.handler(event_basic) + metric = logged_metric('perform.active_job') + expect(metric).must_be :present? + expect(metric['AppName']).must_equal 'Dummy' + expect(metric['JobName']).must_equal 'TestHelper::Jobs::BasicJob' + expect(metric['Duration']).must_equal 0 + expect(metric['JobId']).must_equal '527cd37e-08f4-4aa8-9834-a46220cdc5a3' + expect(metric['QueueName']).must_equal 'lambdakiq-JobsQueue-TESTING123.fifo' + expect(metric['MessageId']).must_equal '9081fe74-bc79-451f-a03a-2fe5c6e2f807' + expect(metric['JobArg1']).must_equal 'test' + end + + it 'must change message visibility to next value for failed jobs' do + event = event_basic attributes: { ApproximateReceiveCount: '7' }, job_class: 'TestHelper::Jobs::ErrorJob' + expect(->{ Lambdakiq::Job.handler(event) }).must_raise 'HELL' + expect(change_message_visibility).must_be :present? + expect(change_message_visibility_params[:visibility_timeout]).must_equal 1416 + expect(perform_buffer_last_value).must_equal 'ErrorJob with: "test"' + expect(logger).must_include 'Performing TestHelper::Jobs::ErrorJob' + expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJob' + # binding.pry ; return + expect(logged_metric('retry_stopped.active_job')).must_be_nil + enqueue_retry = logged_metric('enqueue_retry.active_job') + expect(enqueue_retry).must_be :present? + expect(enqueue_retry['Executions']).must_equal 7 + expect(enqueue_retry['ExceptionName']).must_equal 'RuntimeError' + end + + it 'wraps returned errors with no backtrace which avoids excessive/duplicate cloudwatch logging' do + event = event_basic job_class: 'TestHelper::Jobs::ErrorJob' + error = expect(->{ Lambdakiq::Job.handler(event) }).must_raise 'HELL' + expect(error.class.name).must_equal 'Lambdakiq::JobError' + expect(error.backtrace).must_equal [] + expect(perform_buffer_last_value).must_equal 'ErrorJob with: "test"' + expect(logger).must_include 'Performing TestHelper::Jobs::ErrorJob' + expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJob' + end + + it 'must delete message for failed jobs at the end of the queue/message max receive count' do + # See ClientHelpers for setting queue to max receive count of 8. + event = event_basic attributes: { ApproximateReceiveCount: '8' }, job_class: 'TestHelper::Jobs::ErrorJob' + Lambdakiq::Job.handler(event) + expect(delete_message).must_be :present? + expect(perform_buffer_last_value).must_equal 'ErrorJob with: "test"' + expect(logger).must_include 'Performing TestHelper::Jobs::ErrorJob' + expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJob' + expect(logged_metric('enqueue_retry.active_job')).must_be_nil + retry_stopped = logged_metric('retry_stopped.active_job') + expect(retry_stopped).must_be :present? + expect(retry_stopped['Executions']).must_equal 8 + expect(retry_stopped['ExceptionName']).must_equal 'RuntimeError' + end + + it 'must not perform and allow fifo queue to use message visibility as delay' do + event = event_basic_delay minutes: 6 + error = expect(->{ Lambdakiq::Job.handler(event) }).must_raise 'HELL' + expect(delete_message).must_be :blank? + expect(change_message_visibility).must_be :present? + expect(change_message_visibility_params[:visibility_timeout]).must_be_close_to 6.minutes, 1 + expect(perform_buffer_last_value).must_be_nil + expect(logger).must_be :blank? + end + + it 'must not perform and allow fifo queue to use message visibility as delay (using SentTimestamp)' do + event = event_basic_delay minutes: 10, timestamp: 2.minutes.ago.strftime('%s%3N') + error = expect(->{ Lambdakiq::Job.handler(event) }).must_raise 'HELL' + expect(delete_message).must_be :blank? + expect(change_message_visibility).must_be :present? + expect(change_message_visibility_params[:visibility_timeout]).must_be_close_to 8.minutes, 1 + expect(perform_buffer_last_value).must_be_nil + expect(logger).must_be :blank? + end + + it 'must perform and allow fifo queue to use message visibility as delay but not when SentTimestamp is too far in the past' do + event = event_basic_delay minutes: 2, timestamp: 3.minutes.ago.strftime('%s%3N') + Lambdakiq::Job.handler(event) + expect(delete_message).must_be :present? + expect(change_message_visibility).must_be_nil + expect(perform_buffer_last_value).must_equal 'BasicJob with: "test"' + expect(logger).must_include 'Performing TestHelper::Jobs::BasicJob' + expect(logger).must_include 'Performed TestHelper::Jobs::BasicJob' + end + + it 'must use `lambdakiq_options` retry options set to 0 and not retry job' do + event = event_basic job_class: 'TestHelper::Jobs::ErrorJobNoRetry' + Lambdakiq::Job.handler(event) + expect(delete_message).must_be :present? + expect(perform_buffer_last_value).must_equal 'ErrorJobNoRetry with: "test"' + expect(logger).must_include 'Performing TestHelper::Jobs::ErrorJobNoRetry' + expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJobNoRetry' + end + + it 'must use `lambdakiq_options` retry options set to 1 and retry job' do + event = event_basic job_class: 'TestHelper::Jobs::ErrorJobOneRetry' + error = expect(->{ Lambdakiq::Job.handler(event) }).must_raise 'HELL' + expect(delete_message).must_be :blank? + expect(perform_buffer_last_value).must_equal 'ErrorJobOneRetry with: "test"' + expect(change_message_visibility).must_be :present? + expect(change_message_visibility_params[:visibility_timeout]).must_equal 30.seconds + expect(logger).must_include 'Performing TestHelper::Jobs::ErrorJobOneRetry' + expect(logger).must_include 'Error performing TestHelper::Jobs::ErrorJobOneRetry' + end + + private + + def job(event: event_basic) + record = Lambdakiq::Event.records(event).first + Lambdakiq::Job.new(record) + end + +end diff --git a/test/cases/jobs/basic_async_job_test.rb b/test/cases/jobs/basic_async_job_test.rb new file mode 100644 index 0000000..070204a --- /dev/null +++ b/test/cases/jobs/basic_async_job_test.rb @@ -0,0 +1,15 @@ +require 'test_helper' + +class BasicAsyncJobTest < LambdakiqSpec + before do + TestHelper::Jobs::BasicAsyncJob.perform_later('somework') + expect(sent_message).must_be :blank? + wait_for('Waiting for sent message API call.') { sent_message } + end + + it 'message body' do + expect(sent_message_body['queue_name']).must_equal queue_name + expect(sent_message_body['job_class']).must_equal 'TestHelper::Jobs::BasicAsyncJob' + expect(sent_message_body['arguments']).must_equal ['somework'] + end +end diff --git a/test/cases/jobs/basic_job_delay_test.rb b/test/cases/jobs/basic_job_delay_test.rb new file mode 100644 index 0000000..df61ce6 --- /dev/null +++ b/test/cases/jobs/basic_job_delay_test.rb @@ -0,0 +1,15 @@ +require 'test_helper' + +class BasicJobDelayTest < LambdakiqSpec + before do + TestHelper::Jobs::BasicJob.set(wait: 5.minutes).perform_later('somework') + expect(sent_message).must_be :present? + end + + it 'message attributes include `delay_seconds` since no wait was set' do + delay_seconds = sent_message_attributes['delay_seconds'] + expect(delay_seconds).must_be :present? + expect(delay_seconds[:data_type]).must_equal 'String' + expect(delay_seconds[:string_value]).must_equal '300' + end +end diff --git a/test/cases/jobs/basic_job_nofifo_delay_test.rb b/test/cases/jobs/basic_job_nofifo_delay_test.rb new file mode 100644 index 0000000..96c26d4 --- /dev/null +++ b/test/cases/jobs/basic_job_nofifo_delay_test.rb @@ -0,0 +1,17 @@ +require 'test_helper' + +class BasicJobNofifoDelayTest < LambdakiqSpec + before do + TestHelper::Jobs::BasicNofifoJob.set(wait: 5.minutes).perform_later('somework') + expect(sent_message).must_be :present? + end + + it 'uses default `delay_seconds` since non-FIFO queues support this natively' do + expect(sent_message_params[:delay_seconds]).must_equal 300 + end + + it 'message attributes exclude `delay_seconds` since non-FIFO queues support this natively' do + delay_seconds = sent_message_attributes['delay_seconds'] + expect(delay_seconds).must_be_nil + end +end diff --git a/test/cases/jobs/basic_job_nofifo_job_test.rb b/test/cases/jobs/basic_job_nofifo_job_test.rb new file mode 100644 index 0000000..c39433c --- /dev/null +++ b/test/cases/jobs/basic_job_nofifo_job_test.rb @@ -0,0 +1,17 @@ +require 'test_helper' + +class BasicJobNofifoTest < LambdakiqSpec + before do + TestHelper::Jobs::BasicNofifoJob.perform_later('somework') + expect(sent_message).must_be :present? + end + + it 'message body has no fifo queue nave vs fifo super class ' do + expect(sent_message_body['queue_name']).must_equal 'lambdakiq-JobsQueue-TESTING123' + end + + it 'message group and deduplication id not used for non fifo queues' do + expect(sent_message_params[:message_group_id]).must_be_nil + expect(sent_message_params[:message_deduplication_id]).must_be_nil + end +end diff --git a/test/cases/jobs/basic_job_test.rb b/test/cases/jobs/basic_job_test.rb new file mode 100644 index 0000000..3797998 --- /dev/null +++ b/test/cases/jobs/basic_job_test.rb @@ -0,0 +1,31 @@ +require 'test_helper' + +class BasicJobTest < LambdakiqSpec + before do + TestHelper::Jobs::BasicJob.perform_later('somework') + expect(sent_message).must_be :present? + end + + it 'message body' do + expect(sent_message_body['queue_name']).must_equal queue_name + expect(sent_message_body['job_class']).must_equal 'TestHelper::Jobs::BasicJob' + expect(sent_message_body['arguments']).must_equal ['somework'] + end + + it 'message attributes identify this as a Lambdakiq job' do + lambdakiq = sent_message_attributes['lambdakiq'] + expect(lambdakiq).must_be_instance_of Hash + expect(lambdakiq[:data_type]).must_equal 'String' + expect(lambdakiq[:string_value]).must_equal '1' + end + + it 'message attributes do not include `delay_seconds` since no wait was set' do + expect(sent_message_attributes.key?('delay_seconds')).must_equal false + end + + it 'message group and deduplication id for default fifo queue are sent' do + expect(sent_message_params[:message_deduplication_id]).must_be :present? + UUID.validate(sent_message_params[:message_deduplication_id]) + expect(sent_message_params[:message_group_id]).must_equal sent_message_params[:message_deduplication_id] + end +end diff --git a/test/cases/queue_test.rb b/test/cases/queue_test.rb new file mode 100644 index 0000000..5efb081 --- /dev/null +++ b/test/cases/queue_test.rb @@ -0,0 +1,13 @@ +require 'test_helper' + +class QueueTest < LambdakiqSpec + let(:queue) { Lambdakiq.client.queues[queue_name] } + + it '#fifo?' do + expect(queue.fifo?).must_equal true + end + + it '#max_receive_count' do + expect(queue.max_receive_count).must_equal 8 + end +end diff --git a/test/cases/record_test.rb b/test/cases/record_test.rb new file mode 100644 index 0000000..3aae93b --- /dev/null +++ b/test/cases/record_test.rb @@ -0,0 +1,38 @@ +require 'test_helper' + +class RecordTest < LambdakiqSpec + let(:event) { event_basic } + let(:records) { Lambdakiq::Event.records(event) } + let(:record) { Lambdakiq::Record.new(records.first) } + + it '#body' do + expect(record.body).must_be_instance_of String + expect(JSON.parse(record.body)).must_be_instance_of Hash + end + + it '#receipt_handle' do + expect(record.receipt_handle).must_be_instance_of String + expect(record.receipt_handle).must_match /AQE.*KtD/ + end + + it '#queue_name' do + expect(record.queue_name).must_equal queue_name + end + + it '#attributes' do + expect(record.attributes).must_be_instance_of Hash + end + + it '#sent_at' do + sent_at = record.sent_at + expect(sent_at).must_be_instance_of ActiveSupport::TimeWithZone + expect(sent_at.year).must_equal 2020 + expect(sent_at.month).must_equal 11 + expect(sent_at.day).must_equal 30 + end + + it '#receive_count' do + expect(record.receive_count).must_equal 1 + end + +end diff --git a/test/dummy_app/config/.keep b/test/dummy_app/config/.keep new file mode 100644 index 0000000..e69de29 diff --git a/test/dummy_app/init.rb b/test/dummy_app/init.rb new file mode 100644 index 0000000..48d455e --- /dev/null +++ b/test/dummy_app/init.rb @@ -0,0 +1,14 @@ +require 'rails/all' + +module Dummy + class Application < ::Rails::Application + config.root = File.join __FILE__, '..' + config.eager_load = true + logger = ActiveSupport::Logger.new(StringIO.new) + logger.formatter = ActiveSupport::Logger::SimpleFormatter.new + config.logger = logger + config.active_job.queue_adapter = :lambdakiq + end +end + +Dummy::Application.initialize! diff --git a/test/lambdakiq_test.rb b/test/lambdakiq_test.rb deleted file mode 100644 index 05dd39f..0000000 --- a/test/lambdakiq_test.rb +++ /dev/null @@ -1,7 +0,0 @@ -require 'test_helper' - -class HandlerTest < LambdakiqSpec - it 'starting off' do - expect('true').must_equal 'true' - end -end diff --git a/test/test_helper.rb b/test/test_helper.rb index 0b001e3..d517723 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -1,11 +1,46 @@ -$LOAD_PATH.unshift File.expand_path('../../lib', __FILE__) -require 'lambdakiq' -require 'pry' +ENV['RAILS_ENV'] = 'test' +ENV['TEST_QUEUE_NAME'] ||= 'lambdakiq-JobsQueue-TESTING123.fifo' +ENV['BUNDLE_GEMFILE'] ||= File.expand_path('../../Gemfile', __FILE__) +require 'bundler/setup' +Bundler.require :default, :development, :test +require 'rails' +require 'aws-sdk-sqs' +require 'stringio' +require 'timeout' require 'minitest/autorun' require 'minitest/focus' require 'mocha/minitest' -require 'test_helper/event_helpers' +Dir['test/test_helper/*.{rb}'].each { |f| require_relative "../#{f}" } +Lambdakiq::Client.default_options.merge! stub_responses: true +require_relative './dummy_app/init' class LambdakiqSpec < Minitest::Spec + include TestHelper::ClientHelpers, + TestHelper::ApiRequestHelpers, + TestHelper::EventHelpers, + TestHelper::QueueHelpers, + TestHelper::LogHelpers, + TestHelper::PerformHelpers + + before do + client_reset! + client_stub_responses + logger_reset! + perform_buffer_clear! + end + + private + + def wait_for(message, timeout: 2) + Timeout.timeout(timeout) do + loop do + value = yield + value ? break : sleep(0.1) + end + end + rescue Timeout::Error + flunk(message) + end + end diff --git a/test/test_helper/api_request_helpers.rb b/test/test_helper/api_request_helpers.rb new file mode 100644 index 0000000..f17096e --- /dev/null +++ b/test/test_helper/api_request_helpers.rb @@ -0,0 +1,41 @@ +module TestHelper + module ApiRequestHelpers + + private + + def delete_message + api_requests.reverse.detect do |r| + r[:operation_name] == :delete_message + end + end + + def change_message_visibility + api_requests.reverse.detect do |r| + r[:operation_name] == :change_message_visibility + end + end + + def change_message_visibility_params + change_message_visibility[:params] + end + + def sent_message + api_requests.reverse.detect do |r| + r[:operation_name] == :send_message + end + end + + def sent_message_params + sent_message[:params] + end + + def sent_message_body + JSON.parse sent_message_params[:message_body] + end + + def sent_message_attributes + sent_message_params[:message_attributes] + end + + end +end diff --git a/test/test_helper/client_helpers.rb b/test/test_helper/client_helpers.rb new file mode 100644 index 0000000..5e2e5e5 --- /dev/null +++ b/test/test_helper/client_helpers.rb @@ -0,0 +1,34 @@ +module TestHelper + module ClientHelpers + extend ActiveSupport::Concern + + included do + let(:max_receive_count) { 8 } + end + + private + + def client + Lambdakiq.client.sqs + end + + def client_reset! + Lambdakiq.instance_variable_set :@client, nil + end + + def client_stub_responses + client.stub_responses(:get_queue_url, { + queue_url: 'https://sqs.us-stubbed-1.amazonaws.com' + }) + redrive_policy = JSON.dump({maxReceiveCount: max_receive_count.to_s}) + client.stub_responses(:get_queue_attributes, { + attributes: { 'RedrivePolicy' => redrive_policy } + }) + end + + def api_requests + client.api_requests + end + + end +end diff --git a/test/test_helper/event_helpers.rb b/test/test_helper/event_helpers.rb index 91538fd..f695391 100644 --- a/test/test_helper/event_helpers.rb +++ b/test/test_helper/event_helpers.rb @@ -1,2 +1,28 @@ require 'test_helper/events/base' require 'test_helper/events/basic' + +module TestHelper + module EventHelpers + + private + + def event_basic(overrides = {}) + Events::Basic.create(overrides) + end + + def event_basic_delay(minutes: 5, timestamp: Time.current.strftime('%s%3N')) + Events::Basic.create( + attributes: { SentTimestamp: timestamp }, + messageAttributes: { + delay_seconds: { + stringValue: minutes.minutes.to_s, + stringListValues: [], + binaryListValues: [], + dataType: 'String' + } + } + ) + end + + end +end diff --git a/test/test_helper/events/base.rb b/test/test_helper/events/base.rb index 64eaa90..d1ab667 100644 --- a/test/test_helper/events/base.rb +++ b/test/test_helper/events/base.rb @@ -1,4 +1,4 @@ -module TestHelpers +module TestHelper module Events class Base @@ -6,7 +6,13 @@ class Base self.event = Hash.new def self.create(overrides = {}) - event.deep_merge(overrides.stringify_keys) + job_class = overrides.delete(:job_class) + event.deep_dup.tap do |e| + e['Records'].each do |r| + r.deep_merge!(overrides.deep_stringify_keys) + r['body'].sub! 'TestHelper::Jobs::BasicJob', job_class if job_class + end + end end end diff --git a/test/test_helper/events/basic.rb b/test/test_helper/events/basic.rb index 3f572c7..03a318f 100644 --- a/test/test_helper/events/basic.rb +++ b/test/test_helper/events/basic.rb @@ -1,42 +1,40 @@ -module TestHelpers +module TestHelper module Events class Basic < Base - self.event = { - "Records" => [ - { - "messageId" => "c42c6b3a-1c01-48eb-8934-aeb4e0638aa7", - "receiptHandle" => "AQEBsQ60u/KXaRcorTDrqJ6zTs/p5nQ9Bbym4JSTvoW6g4dTMReChX5Quh3OP/+34ZFSgGKCwN8MixfUFag+SCc/SSFcZoBqbPAjHktQ00BnVemjYZp8fS3xHOjczjPNW2Ds1k5ijZn1v+zxwWtzSKSVSAQJVneh0+4p0zfXehKvlQWI8mYIm7ixdml1zPanosbOn50njp3eN6DGOx0QLPwYELViDv0/zSIzSxfsac0jw2waO1o1jtsU87XJ25v46TlBeuGhMKFmJ6fkiUNqTtx75v6FXtbM16W21Jhw6Tbh6+Q=", - "body" => "{\"job_class\" =>\"KiqitJob\",\"job_id\" =>\"24a293dd-18b6-4f07-aa45-337589956826\",\"provider_job_id\" =>null,\"queue_name\" =>\"lambdakiq-jobs.fifo\",\"priority\" =>null,\"arguments\" =>[83],\"executions\" =>0,\"exception_executions\" =>{},\"locale\" =>\"en\",\"timezone\" =>\"UTC\",\"enqueued_at\" =>\"2020-11-28T03 =>03 =>00Z\"}", - "attributes" => { - "ApproximateReceiveCount" => "1", - "SentTimestamp" => "1606532580760", - "SequenceNumber" => "18858016414384115456", - "MessageGroupId" => "ShoryukenMessage", - "SenderId" => "AROA4DJKY67RIRD72L5DE", - "MessageDeduplicationId" => "6f872995370771f172e98af04e09267266f0b618e0d0486c140023afaf689c08", - "ApproximateFirstReceiveTimestamp" => "1606532580760" - }, - "messageAttributes" => { - "shoryuken_class" => { - "stringValue" => "ActiveJob => =>QueueAdapters => =>ShoryukenAdapter => =>JobWrapper", - "stringListValues" => [ - - ], - "binaryListValues" => [ - - ], - "dataType" => "String" - } - }, - "md5OfBody" => "f903390c94cdcca2443b8d0e86422edb", - "md5OfMessageAttributes" => "ff41d67aace8f6c385e8a5071b828b5c", - "eventSource" => "aws =>sqs", - "eventSourceARN" => "arn =>aws =>sqs =>us-east-1 =>831702759394 =>lambdakiq-jobs.fifo", - "awsRegion" => "us-east-1" - } - ] - }.freeze + self.event = JSON.load(' + { + "Records": [ + { + "messageId": "9081fe74-bc79-451f-a03a-2fe5c6e2f807", + "receiptHandle": "AQEBgbn8GmF1fMo4z3IIqlJYymS6e7NBynwE+LsQlzjjdcKtSIomGeKMe0noLC9UDShUSe8bzr0s+pby03stHNRv1hgg4WRB5YT4aO0dwOuio7LvMQ/VW88igQtWmca78K6ixnU9X5Sr6J+/+WMvjBgIdvO0ycAM2tyJ1nxRHs/krUoLo/bFCnnwYh++T5BLQtFjFGrRkPjWnzjAbLWKU6Hxxr5lkHSxGhjfAoTCOjhi9crouXaWD+H1uvoGx/O/ZXaeMNjKIQoKjhFguwbEpvrq2Pfh2x9nRgBP3cKa9qw4Q3oFQ0MiQAvnK+UO8cCnsKtD", + "body": "{\"job_class\":\"TestHelper::Jobs::BasicJob\",\"job_id\":\"527cd37e-08f4-4aa8-9834-a46220cdc5a3\",\"provider_job_id\":null,\"queue_name\":\"lambdakiq-JobsQueue-TESTING123.fifo\",\"priority\":null,\"arguments\":[\"test\"],\"executions\":0,\"exception_executions\":{},\"locale\":\"en\",\"timezone\":\"UTC\",\"enqueued_at\":\"2020-11-30T13:07:36Z\"}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1606741656429", + "SequenceNumber": "18858069937755376128", + "MessageGroupId": "527cd37e-08f4-4aa8-9834-a46220cdc5a3", + "SenderId": "AROA4DJKY67RBVYCN5UZ3", + "MessageDeduplicationId": "527cd37e-08f4-4aa8-9834-a46220cdc5a3", + "ApproximateFirstReceiveTimestamp": "1606741656429" + }, + "messageAttributes": { + "lambdakiq": { + "stringValue": "1", + "stringListValues": [], + "binaryListValues": [], + "dataType": "String" + } + }, + "md5OfMessageAttributes": "5fde2d817e4e6b7f28735d3b1725f817", + "md5OfBody": "6477b54fb64dde974ea7514e87d3b8a5", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-1:831702759394:lambdakiq-JobsQueue-TESTING123.fifo", + "awsRegion": "us-east-1" + } + ] + } + ').freeze end end diff --git a/test/test_helper/jobs.rb b/test/test_helper/jobs.rb new file mode 100644 index 0000000..2ab6626 --- /dev/null +++ b/test/test_helper/jobs.rb @@ -0,0 +1,7 @@ +require 'test_helper/jobs/application_job' +require 'test_helper/jobs/basic_job' +require 'test_helper/jobs/basic_async_job' +require 'test_helper/jobs/basic_nofifo_job' +require 'test_helper/jobs/error_job' +require 'test_helper/jobs/error_job_no_retry' +require 'test_helper/jobs/error_job_one_retry' diff --git a/test/test_helper/jobs/application_job.rb b/test/test_helper/jobs/application_job.rb new file mode 100644 index 0000000..624aefb --- /dev/null +++ b/test/test_helper/jobs/application_job.rb @@ -0,0 +1,8 @@ +module TestHelper + module Jobs + class ApplicationJob < ActiveJob::Base + queue_as ENV['TEST_QUEUE_NAME'] + include Lambdakiq::Worker + end + end +end diff --git a/test/test_helper/jobs/basic_async_job.rb b/test/test_helper/jobs/basic_async_job.rb new file mode 100644 index 0000000..e373e49 --- /dev/null +++ b/test/test_helper/jobs/basic_async_job.rb @@ -0,0 +1,10 @@ +module TestHelper + module Jobs + class BasicAsyncJob < ApplicationJob + lambdakiq_options async: true + def perform(object) + TestHelper::PerformBuffer.add "BasicAsyncJob with: #{object.inspect}" + end + end + end +end diff --git a/test/test_helper/jobs/basic_job.rb b/test/test_helper/jobs/basic_job.rb new file mode 100644 index 0000000..15c21ff --- /dev/null +++ b/test/test_helper/jobs/basic_job.rb @@ -0,0 +1,9 @@ +module TestHelper + module Jobs + class BasicJob < ApplicationJob + def perform(object) + TestHelper::PerformBuffer.add "BasicJob with: #{object.inspect}" + end + end + end +end diff --git a/test/test_helper/jobs/basic_nofifo_job.rb b/test/test_helper/jobs/basic_nofifo_job.rb new file mode 100644 index 0000000..0902009 --- /dev/null +++ b/test/test_helper/jobs/basic_nofifo_job.rb @@ -0,0 +1,10 @@ +module TestHelper + module Jobs + class BasicNofifoJob < ApplicationJob + queue_as ENV['TEST_QUEUE_NAME'].sub('.fifo','') + def perform(object) + TestHelper::PerformBuffer.add "BasicNofifoJob with: #{object.inspect}" + end + end + end +end diff --git a/test/test_helper/jobs/error_job.rb b/test/test_helper/jobs/error_job.rb new file mode 100644 index 0000000..b5b99a7 --- /dev/null +++ b/test/test_helper/jobs/error_job.rb @@ -0,0 +1,10 @@ +module TestHelper + module Jobs + class ErrorJob < ApplicationJob + def perform(object) + TestHelper::PerformBuffer.add "ErrorJob with: #{object.inspect}" + raise('HELL') + end + end + end +end diff --git a/test/test_helper/jobs/error_job_no_retry.rb b/test/test_helper/jobs/error_job_no_retry.rb new file mode 100644 index 0000000..ac1634e --- /dev/null +++ b/test/test_helper/jobs/error_job_no_retry.rb @@ -0,0 +1,11 @@ +module TestHelper + module Jobs + class ErrorJobNoRetry < ApplicationJob + lambdakiq_options retry: 0 + def perform(object) + TestHelper::PerformBuffer.add "ErrorJobNoRetry with: #{object.inspect}" + raise('HELL') + end + end + end +end diff --git a/test/test_helper/jobs/error_job_one_retry.rb b/test/test_helper/jobs/error_job_one_retry.rb new file mode 100644 index 0000000..5d0c344 --- /dev/null +++ b/test/test_helper/jobs/error_job_one_retry.rb @@ -0,0 +1,11 @@ +module TestHelper + module Jobs + class ErrorJobOneRetry < ApplicationJob + lambdakiq_options retry: 1 + def perform(object) + TestHelper::PerformBuffer.add "ErrorJobOneRetry with: #{object.inspect}" + raise('HELL') + end + end + end +end diff --git a/test/test_helper/log_helpers.rb b/test/test_helper/log_helpers.rb new file mode 100644 index 0000000..ccd4849 --- /dev/null +++ b/test/test_helper/log_helpers.rb @@ -0,0 +1,25 @@ +module TestHelper + module LogHelpers + extend ActiveSupport::Concern + + included do + let(:logger) { Rails.logger.instance_variable_get(:@logdev).instance_variable_get(:@dev).string } + end + + private + + def logged_metric(event) + metric = logged_metrics.reverse.detect { |l| l.include?(event) } + JSON.parse(metric) if metric + end + + def logged_metrics + logger.each_line.select { |l| l.include? 'CloudWatchMetrics' } + end + + def logger_reset! + Rails.logger.instance_variable_get(:@logdev).instance_variable_get(:@dev).truncate 0 + end + + end +end diff --git a/test/test_helper/perform_helpers.rb b/test/test_helper/perform_helpers.rb new file mode 100644 index 0000000..be1c686 --- /dev/null +++ b/test/test_helper/perform_helpers.rb @@ -0,0 +1,32 @@ +module TestHelper + module PerformBuffer + def clear + values.clear + end + + def add(value) + values << value + end + + def values + @values ||= [] + end + + def last_value + values.last + end + + extend self + end + module PerformHelpers + private + + def perform_buffer_clear! + PerformBuffer.clear + end + + def perform_buffer_last_value + PerformBuffer.last_value + end + end +end diff --git a/test/test_helper/queue_helpers.rb b/test/test_helper/queue_helpers.rb new file mode 100644 index 0000000..2a3bf80 --- /dev/null +++ b/test/test_helper/queue_helpers.rb @@ -0,0 +1,11 @@ +module TestHelper + module QueueHelpers + + private + + def queue_name + ENV['TEST_QUEUE_NAME'] + end + + end +end