diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 943b111..d289eb5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -65,5 +65,15 @@ jobs: bundle exec rails db:create bundle exec rails db:schema:load + # - name: add Outboxer + # run: | + # echo "gem 'outboxer', git: 'https://github.com/fast-programmer/outboxer.git', branch: 'master'" >> Gemfile + # bundle install + # bin/rails g outboxer:schema + # bin/rake db:migrate + # bin/rake outboxer:db:seed + # bin/rails g outboxer:sidekiq_publisher + # echo "$(sed '$d' lib/event.rb)\n\n # callbacks\n\n after_create do |event|\n Outboxer::Message.queue(messageable: event)\n end\nend" > lib/event.rb + - name: Run tests run: bundle exec rspec diff --git a/Gemfile b/Gemfile index 86b6a54..83f9f3d 100644 --- a/Gemfile +++ b/Gemfile @@ -14,6 +14,8 @@ gem "activerecord" gem "sidekiq" +gem 'outboxer', git: 'https://github.com/fast-programmer/outboxer.git', branch: 'master' + group :development do end diff --git a/Gemfile.lock b/Gemfile.lock index 7da0472..4fa7412 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,3 +1,11 @@ +GIT + remote: https://github.com/fast-programmer/outboxer.git + revision: 70e02414ed68f58e864f738651dfe4a14b947569 + branch: master + specs: + outboxer (1.0.0.pre.beta) + activerecord (>= 7.0.8.6) + GEM remote: https://rubygems.org/ specs: @@ -212,6 +220,7 @@ DEPENDENCIES activerecord database_cleaner-active_record factory_bot_rails + outboxer! pg pry-byebug puma diff --git a/app/jobs/outboxer_integration/message/publish_job.rb b/app/jobs/outboxer_integration/message/publish_job.rb index ab24f92..235b90e 100644 --- a/app/jobs/outboxer_integration/message/publish_job.rb +++ b/app/jobs/outboxer_integration/message/publish_job.rb @@ -19,37 +19,37 @@ def perform(args) when 'Accountify::Invoice::DraftedEvent' Accountify::InvoiceStatusSummary::RegenerateJob.perform_async({ 'tenant_id' => messageable.tenant_id, - 'organisation_id' => messageable.body['organisation']['id'], + 'organisation_id' => messageable.body['invoice']['organisation_id'], 'invoice_updated_at' => messageable.created_at.utc.iso8601 }) when 'Accountify::Invoice::UpdatedEvent' Accountify::InvoiceStatusSummary::RegenerateJob.perform_async({ 'tenant_id' => messageable.tenant_id, - 'organisation_id' => messageable.body['organisation']['id'], + 'organisation_id' => messageable.body['invoice']['organisation_id'], 'invoice_updated_at' => messageable.created_at.utc.iso8601 }) when 'Accountify::Invoice::IssuedEvent' Accountify::InvoiceStatusSummary::RegenerateJob.perform_async({ 'tenant_id' => messageable.tenant_id, - 'organisation_id' => messageable.body['organisation']['id'], + 'organisation_id' => messageable.body['invoice']['organisation_id'], 'invoice_updated_at' => messageable.created_at.utc.iso8601 }) when 'Accountify::Invoice::PaidEvent' Accountify::InvoiceStatusSummary::RegenerateJob.perform_async({ 'tenant_id' => messageable.tenant_id, - 'organisation_id' => messageable.body['organisation']['id'], + 'organisation_id' => messageable.body['invoice']['organisation_id'], 'invoice_updated_at' => messageable.created_at.utc.iso8601 }) when 'Accountify::Invoice::VoidedEvent' Accountify::InvoiceStatusSummary::RegenerateJob.perform_async({ 'tenant_id' => messageable.tenant_id, - 'organisation_id' => messageable.body['organisation']['id'], + 'organisation_id' => messageable.body['invoice']['organisation_id'], 'invoice_updated_at' => messageable.created_at.utc.iso8601 }) when 'Accountify::Invoice::DeletedEvent' Accountify::InvoiceStatusSummary::RegenerateJob.perform_async({ 'tenant_id' => messageable.tenant_id, - 'organisation_id' => messageable.body['organisation']['id'], + 'organisation_id' => messageable.body['invoice']['organisation_id'], 'invoice_updated_at' => messageable.created_at.utc.iso8601 }) end end diff --git a/bin/outboxer_publisher b/bin/outboxer_publisher new file mode 100755 index 0000000..b9cd10e --- /dev/null +++ b/bin/outboxer_publisher @@ -0,0 +1,38 @@ +#!/usr/bin/env ruby + +require 'bundler/setup' +require 'sidekiq' +require 'outboxer' + +require_relative '../app/jobs/outboxer_integration/message/publish_job' + +options = { + env: ENV.fetch('OUTBOXER_ENV', 'development'), + buffer: ENV.fetch('OUTBOXER_BUFFER', 1000).to_i, + concurrency: ENV.fetch('OUTBOXER_CONCURRENCY', 1).to_i, + poll: ENV.fetch('OUTBOXER_POLL', 5.0).to_f, + tick: ENV.fetch('OUTBOXER_TICK', 0.1).to_f, + heartbeat: ENV.fetch('OUTBOXER_HEARTBEAT', 5.0).to_f, + log_level: ENV.fetch('OUTBOXER_LOG_LEVEL', 'INFO'), + redis_url: ENV.fetch('OUTBOXER_REDIS_URL', 'redis://localhost:6379/0') +} + +Sidekiq.configure_client do |config| + config.redis = { url: options[:redis_url], size: options[:concurrency] } +end + +logger = Outboxer::Logger.new($stdout, level: options[:log_level]) + +Outboxer::Publisher.publish( + env: options[:env], + buffer: options[:buffer], + concurrency: options[:concurrency], + poll: options[:poll], + tick: options[:tick], + heartbeat: options[:heartbeat], + logger: logger +) do |message| + OutboxerIntegration::Message::PublishJob.perform_async({ + 'messageable_id' => message[:messageable_id], 'messageable_type' => message[:messageable_type] + }) +end diff --git a/db/migrate/20241214080817_create_outboxer_settings.rb b/db/migrate/20241214080817_create_outboxer_settings.rb new file mode 100644 index 0000000..e1e43c3 --- /dev/null +++ b/db/migrate/20241214080817_create_outboxer_settings.rb @@ -0,0 +1,18 @@ +class CreateOutboxerSettings < ActiveRecord::Migration[6.1] + def up + ActiveRecord::Base.transaction do + create_table :outboxer_settings do |t| + t.string :name, limit: 255, null: false + t.string :value, limit: 255, null: false + + t.timestamps + end + + add_index :outboxer_settings, :name, unique: true + end + end + + def down + drop_table :outboxer_settings + end +end diff --git a/db/migrate/20241214080818_create_outboxer_messages.rb b/db/migrate/20241214080818_create_outboxer_messages.rb new file mode 100644 index 0000000..88a3d52 --- /dev/null +++ b/db/migrate/20241214080818_create_outboxer_messages.rb @@ -0,0 +1,40 @@ +class CreateOutboxerMessages < ActiveRecord::Migration[6.1] + def up + create_table :outboxer_messages do |t| + t.string :status, limit: 255, null: false + + t.string :messageable_id, limit: 255, null: false + t.string :messageable_type, limit: 255, null: false + + t.datetime :queued_at, precision: 6, null: false + + t.datetime :buffered_at, precision: 6 + + t.datetime :publishing_at, precision: 6 + + t.datetime :updated_at, precision: 6, null: false + + t.bigint :publisher_id + t.string :publisher_name, limit: 263 # 255 (hostname) + 1 (colon) + 7 (pid) + end + + # messages by status count + add_index :outboxer_messages, :status, name: 'idx_outboxer_status' + + # messages by status latency + add_index :outboxer_messages, [:status, :updated_at], + name: 'idx_outboxer_status_updated_at' + + # publisher latency + add_index :outboxer_messages, [:publisher_id, :updated_at], + name: 'idx_outboxer_pub_id_updated_at' + + # publisher throughput + add_index :outboxer_messages, [:status, :publisher_id, :updated_at], + name: 'idx_outboxer_status_pub_id_updated_at' + end + + def down + drop_table :outboxer_messages if table_exists?(:outboxer_messages) + end +end diff --git a/db/migrate/20241214080819_create_outboxer_exceptions.rb b/db/migrate/20241214080819_create_outboxer_exceptions.rb new file mode 100644 index 0000000..369215b --- /dev/null +++ b/db/migrate/20241214080819_create_outboxer_exceptions.rb @@ -0,0 +1,20 @@ +class CreateOutboxerExceptions < ActiveRecord::Migration[6.1] + def up + ActiveRecord::Base.transaction do + create_table :outboxer_exceptions do |t| + t.references :message, foreign_key: { to_table: :outboxer_messages }, null: false + + t.string :class_name, limit: 255, null: false + t.text :message_text, null: false + + t.timestamps + end + + remove_column :outboxer_exceptions, :updated_at + end + end + + def down + drop_table :outboxer_exceptions if table_exists?(:outboxer_exceptions) + end +end diff --git a/db/migrate/20241214080820_create_outboxer_frames.rb b/db/migrate/20241214080820_create_outboxer_frames.rb new file mode 100644 index 0000000..1e7f7ec --- /dev/null +++ b/db/migrate/20241214080820_create_outboxer_frames.rb @@ -0,0 +1,16 @@ +class CreateOutboxerFrames < ActiveRecord::Migration[6.1] + def up + create_table :outboxer_frames do |t| + t.references :exception, foreign_key: { to_table: :outboxer_exceptions }, null: false + + t.integer :index, null: false + t.text :text, null: false + + t.index [:exception_id, :index], unique: true + end + end + + def down + drop_table :outboxer_frames if table_exists?(:outboxer_frames) + end +end diff --git a/db/migrate/20241214080821_create_outboxer_publishers.rb b/db/migrate/20241214080821_create_outboxer_publishers.rb new file mode 100644 index 0000000..f5c7a4e --- /dev/null +++ b/db/migrate/20241214080821_create_outboxer_publishers.rb @@ -0,0 +1,18 @@ +class CreateOutboxerPublishers < ActiveRecord::Migration[6.1] + def up + ActiveRecord::Base.transaction do + create_table :outboxer_publishers do |t| + t.string :name, limit: 263, null: false # 255 (hostname) + 1 (colon) + 7 (pid) + t.string :status, limit: 255, null: false + t.json :settings, null: false + t.json :metrics, null: false + + t.timestamps + end + end + end + + def down + drop_table :outboxer_publishers + end +end diff --git a/db/migrate/20241214080822_create_outboxer_signals.rb b/db/migrate/20241214080822_create_outboxer_signals.rb new file mode 100644 index 0000000..747d475 --- /dev/null +++ b/db/migrate/20241214080822_create_outboxer_signals.rb @@ -0,0 +1,16 @@ +class CreateOutboxerSignals < ActiveRecord::Migration[6.1] + def up + ActiveRecord::Base.transaction do + create_table :outboxer_signals do |t| + t.string :name, limit: 9, null: false + t.references :publisher, foreign_key: { to_table: :outboxer_publishers }, null: false + + t.datetime :created_at, null: false + end + end + end + + def down + drop_table :outboxer_signals + end +end diff --git a/db/schema.rb b/db/schema.rb index 4c88e5a..d958d69 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.0].define(version: 2024_07_06_053510) do +ActiveRecord::Schema[7.0].define(version: 2024_12_14_080822) do # These are extensions that must be enabled in order to support this database enable_extension "plpgsql" @@ -94,9 +94,68 @@ t.index ["eventable_type", "eventable_id"], name: "index_events_on_eventable_type_and_eventable_id" end + create_table "outboxer_exceptions", force: :cascade do |t| + t.bigint "message_id", null: false + t.string "class_name", limit: 255, null: false + t.text "message_text", null: false + t.datetime "created_at", null: false + t.index ["message_id"], name: "index_outboxer_exceptions_on_message_id" + end + + create_table "outboxer_frames", force: :cascade do |t| + t.bigint "exception_id", null: false + t.integer "index", null: false + t.text "text", null: false + t.index ["exception_id", "index"], name: "index_outboxer_frames_on_exception_id_and_index", unique: true + t.index ["exception_id"], name: "index_outboxer_frames_on_exception_id" + end + + create_table "outboxer_messages", force: :cascade do |t| + t.string "status", limit: 255, null: false + t.string "messageable_id", limit: 255, null: false + t.string "messageable_type", limit: 255, null: false + t.datetime "queued_at", null: false + t.datetime "buffered_at" + t.datetime "publishing_at" + t.datetime "updated_at", null: false + t.bigint "publisher_id" + t.string "publisher_name", limit: 263 + t.index ["publisher_id", "updated_at"], name: "idx_outboxer_pub_id_updated_at" + t.index ["status", "publisher_id", "updated_at"], name: "idx_outboxer_status_pub_id_updated_at" + t.index ["status", "updated_at"], name: "idx_outboxer_status_updated_at" + t.index ["status"], name: "idx_outboxer_status" + end + + create_table "outboxer_publishers", force: :cascade do |t| + t.string "name", limit: 263, null: false + t.string "status", limit: 255, null: false + t.json "settings", null: false + t.json "metrics", null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + end + + create_table "outboxer_settings", force: :cascade do |t| + t.string "name", limit: 255, null: false + t.string "value", limit: 255, null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["name"], name: "index_outboxer_settings_on_name", unique: true + end + + create_table "outboxer_signals", force: :cascade do |t| + t.string "name", limit: 9, null: false + t.bigint "publisher_id", null: false + t.datetime "created_at", precision: nil, null: false + t.index ["publisher_id"], name: "index_outboxer_signals_on_publisher_id" + end + add_foreign_key "accountify_contacts", "accountify_organisations", column: "organisation_id" add_foreign_key "accountify_invoice_line_items", "accountify_invoices", column: "invoice_id" add_foreign_key "accountify_invoice_status_summaries", "accountify_organisations", column: "organisation_id" add_foreign_key "accountify_invoices", "accountify_contacts", column: "contact_id" add_foreign_key "accountify_invoices", "accountify_organisations", column: "organisation_id" + add_foreign_key "outboxer_exceptions", "outboxer_messages", column: "message_id" + add_foreign_key "outboxer_frames", "outboxer_exceptions", column: "exception_id" + add_foreign_key "outboxer_signals", "outboxer_publishers", column: "publisher_id" end diff --git a/lib/event.rb b/lib/event.rb index 8269f43..1b6e909 100644 --- a/lib/event.rb +++ b/lib/event.rb @@ -9,4 +9,10 @@ class Event < ActiveRecord::Base # associations belongs_to :eventable, polymorphic: true + + # callbacks + + after_create do |event| + Outboxer::Message.queue(messageable: event) + end end diff --git a/script/accountify/invoice/test_lifecycle.rb b/script/accountify/invoice/test_lifecycle.rb index 0baaf43..93e5e70 100644 --- a/script/accountify/invoice/test_lifecycle.rb +++ b/script/accountify/invoice/test_lifecycle.rb @@ -1,7 +1,7 @@ -require_relative '../../../config/environment' - require 'open3' +require_relative '../../../config/environment' + user_id = 123 tenant_id = 456 @@ -69,9 +69,15 @@ Accountify::Invoice.delete(user_id: user_id, tenant_id: tenant_id, id: invoice[:id]) -puts "Starting Sidekiq..." -sidekiq_cmd = "bundle exec sidekiq -r ./config/sidekiq.rb" -sidekiq_process = IO.popen(sidekiq_cmd) +outboxer_env = ENV['OUTBOXER_ENV'] || ENV['RAILS_ENV'] || 'development' + +sidekiq_server_cmd = "RAILS_ENV=#{outboxer_env} bundle exec sidekiq -r ./config/sidekiq.rb" +puts sidekiq_server_cmd +sidekiq_server_process = IO.popen(sidekiq_server_cmd) + +outboxer_publisher_cmd = "OUTBOXER_ENV=#{outboxer_env} bin/outboxer_publisher" +puts outboxer_publisher_cmd +outboxer_publisher_process = IO.popen(outboxer_publisher_cmd) begin invoice_status_summary = nil @@ -93,13 +99,16 @@ end if invoice_status_summary.nil? - raise Accountify::NotFound, "Invoice status summary not found after #{max_attempts} attempts." + raise Accountify::NotFound, "Invoice status summary not found after #{max_attempts} attempts" end ensure - puts "Stopping Sidekiq..." + puts "Stopping outboxer publisher..." + Process.kill("TERM", outboxer_publisher_process.pid) + Process.wait(outboxer_publisher_process.pid) - Process.kill("TERM", sidekiq_process.pid) - Process.wait(sidekiq_process.pid) + puts "Stopping sidekiq server..." + Process.kill("TERM", sidekiq_server_process.pid) + Process.wait(sidekiq_server_process.pid) end # bundle exec ruby script/accountify/invoice/test_lifecycle.rb diff --git a/spec/integration/accountify/invoice/test_lifecycle_spec.rb b/spec/integration/accountify/invoice/test_lifecycle_spec.rb index 90c1467..c061db2 100644 --- a/spec/integration/accountify/invoice/test_lifecycle_spec.rb +++ b/spec/integration/accountify/invoice/test_lifecycle_spec.rb @@ -1,7 +1,7 @@ require 'rails_helper' RSpec.describe 'Invoice Lifecycle', type: :integration do - xit 'transitions as expected' do + it 'transitions as expected' do Sidekiq::Testing.disable! begin diff --git a/spec/jobs/outboxer_integration/message/publish_job_spec.rb b/spec/jobs/outboxer_integration/message/publish_job_spec.rb index 28aa3d3..f751bc1 100644 --- a/spec/jobs/outboxer_integration/message/publish_job_spec.rb +++ b/spec/jobs/outboxer_integration/message/publish_job_spec.rb @@ -57,7 +57,7 @@ module Message eventable: accountify_organisation, created_at: current_time.utc, body: { - 'organisation' => { 'id' => accountify_organisation.id } }) + 'invoice' => { 'organisation_id' => accountify_organisation.id } }) end before do @@ -86,7 +86,7 @@ module Message eventable: accountify_organisation, created_at: current_time.utc, body: { - 'organisation' => { 'id' => accountify_organisation.id } }) + 'invoice' => { 'organisation_id' => accountify_organisation.id } }) end before do @@ -115,7 +115,7 @@ module Message eventable: accountify_organisation, created_at: current_time.utc, body: { - 'organisation' => { 'id' => accountify_organisation.id } }) + 'invoice' => { 'organisation_id' => accountify_organisation.id } }) end before do @@ -144,7 +144,7 @@ module Message eventable: accountify_organisation, created_at: current_time.utc, body: { - 'organisation' => { 'id' => accountify_organisation.id } }) + 'invoice' => { 'organisation_id' => accountify_organisation.id } }) end before do @@ -173,7 +173,7 @@ module Message eventable: accountify_organisation, created_at: current_time.utc, body: { - 'organisation' => { 'id' => accountify_organisation.id } }) + 'invoice' => { 'organisation_id' => accountify_organisation.id } }) end before do @@ -202,7 +202,7 @@ module Message eventable: accountify_organisation, created_at: current_time.utc, body: { - 'organisation' => { 'id' => accountify_organisation.id } }) + 'invoice' => { 'organisation_id' => accountify_organisation.id } }) end before do