From 4ff34a6a6f5e1a9062e8808d52d59d3feaa65e3e Mon Sep 17 00:00:00 2001 From: Nathan Griffith Date: Tue, 23 Aug 2022 18:42:42 -0400 Subject: [PATCH] Journaled 5.0: Transactional batching (#25) This adds transactional batching to journaled. What does that mean? Well, now, by default, when multiple events are emitted inside of a transaction: ```ruby ActiveRecord::Base.transaction do event_1.journal! event_2.journal! end ``` A single job will be enqueued directly before the SQL `COMMIT` statement, batching up the two events. (And if the transaction rolls back, the job will never be enqueued.) This can be disabled with a global config (for testing purposes -- we should eventually remove this config if we find no downsides in the new behavior): ```ruby Journaled.transactional_batching_enabled = !Rails.env.production? ``` What happens if we aren't in a transaction? Well, the same thing that happened before! (A job will be enqueued right away.) (**I'm looking into adding an additional transactional safety check, that would raise an error if `journal!` is called outside of a transaction!** But for I'll save that for a future PR.) Because this bumps the version to 5.0, it also removes compatibility for `Journaled::DeliveryJob` jobs enqueued with legacy (3.1.0-era) arguments. This job now accepts a list of events to emit, rather than a single event-kwarg-blog, so a new legacy input pattern is accepted for compatibility with jobs enqueued by v4.x.x of the gem. --- README.md | 18 ++ UPGRADING | 19 ++ app/jobs/journaled/delivery_job.rb | 45 ++--- app/models/journaled/writer.rb | 48 +++-- journaled.gemspec | 3 +- lib/journaled.rb | 26 ++- lib/journaled/connection.rb | 48 +++++ lib/journaled/engine.rb | 5 + lib/journaled/errors.rb | 3 + lib/journaled/transaction_ext.rb | 31 +++ lib/journaled/version.rb | 2 +- spec/jobs/journaled/delivery_job_spec.rb | 106 ++++------ spec/lib/journaled/connection_spec.rb | 45 +++++ spec/models/journaled/writer_spec.rb | 246 +++++++++++++++++++++++ spec/spec_helper.rb | 2 + spec/support/environment_spec_helper.rb | 2 +- 16 files changed, 523 insertions(+), 126 deletions(-) create mode 100644 UPGRADING create mode 100644 lib/journaled/connection.rb create mode 100644 lib/journaled/errors.rb create mode 100644 lib/journaled/transaction_ext.rb create mode 100644 spec/lib/journaled/connection_spec.rb diff --git a/README.md b/README.md index c59b8d1..5745fe8 100644 --- a/README.md +++ b/README.md @@ -444,6 +444,24 @@ gem version. As such, **we always recommend upgrading only one major version at a time.** +### Upgrading from 4.3.0 + +Versions of Journaled prior to 5.0 would enqueue events one at a time, but 5.0 +introduces a new transaction-aware feature that will bundle up all events +emitted within a transaction and enqueue them all in a single "batch" job +directly before the SQL `COMMIT` statement. This reduces the database impact of +emitting a large volume of events at once. + +This feature can be disabled conditionally: + +```ruby +Journaled.transactional_batching_enabled = false +``` + +Backwards compatibility has been included for background jobs enqueued by +version 4.0 and above, but **has been dropped for jobs emitted by versions prior +to 4.0**. (Again, be sure to upgrade only one major version at a time.) + ### Upgrading from 3.1.0 Versions of Journaled prior to 4.0 relied directly on environment variables for stream names, but now stream names are configured directly. diff --git a/UPGRADING b/UPGRADING new file mode 100644 index 0000000..bafbe4d --- /dev/null +++ b/UPGRADING @@ -0,0 +1,19 @@ +============================ +NOTE FOR UPGRADING JOURNALED +============================ + +If you are upgrading from an older `journaled` version, please be sure to +increment only ONE major version at a time. + +⚠️ IF YOU ARE UPGRADING FROM 3.1 OR EARLIER, you should NOT USE THIS VERSION. ⚠️ + +Instead, install a version of the gem that is backwards compatible with your +app's currently-enqueued journaled jobs: + +gem 'journaled', '~> 4.2.0' # upgrading from 3.0-3.1 +gem 'journaled', '~> 3.1.0' # upgrading from 2.0-2.5 + +For additional upgrade instructions (e.g. how to handle a few BREAKING CHANGES +to environment variables), please see the README: +https://github.com/Betterment/journaled/blob/v5.0.0/README.md#upgrades + diff --git a/app/jobs/journaled/delivery_job.rb b/app/jobs/journaled/delivery_job.rb index d7e0826..c0e74d9 100644 --- a/app/jobs/journaled/delivery_job.rb +++ b/app/jobs/journaled/delivery_job.rb @@ -12,26 +12,11 @@ class DeliveryJob < ApplicationJob raise KinesisTemporaryFailure end - UNSPECIFIED = Object.new - private_constant :UNSPECIFIED - - def perform(serialized_event:, partition_key:, stream_name: UNSPECIFIED, app_name: UNSPECIFIED) - @serialized_event = serialized_event - @partition_key = partition_key - if app_name != UNSPECIFIED - @stream_name = self.class.legacy_computed_stream_name(app_name: app_name) - elsif stream_name != UNSPECIFIED && !stream_name.nil? - @stream_name = stream_name - else - raise(ArgumentError, 'missing keyword: stream_name') - end - - journal! - end + def perform(*events, **legacy_kwargs) + events << legacy_kwargs if legacy_kwargs.present? + @kinesis_records = events.map { |e| KinesisRecord.new(**e.delete_if { |_k, v| v.nil? }) } - def self.legacy_computed_stream_name(app_name:) - env_var_name = [app_name&.upcase, 'JOURNALED_STREAM_NAME'].compact.join('_') - ENV.fetch(env_var_name) + journal! if Journaled.enabled? end def kinesis_client_config @@ -46,18 +31,22 @@ def kinesis_client_config private - attr_reader :serialized_event, :partition_key, :stream_name + KinesisRecord = Struct.new(:serialized_event, :partition_key, :stream_name, keyword_init: true) do + def initialize(serialized_event:, partition_key:, stream_name:) + super(serialized_event: serialized_event, partition_key: partition_key, stream_name: stream_name) + end - def journal! - kinesis_client.put_record record if Journaled.enabled? + def to_h + { stream_name: stream_name, data: serialized_event, partition_key: partition_key } + end end - def record - { - stream_name: stream_name, - data: serialized_event, - partition_key: partition_key, - } + attr_reader :kinesis_records + + def journal! + kinesis_records.map do |record| + kinesis_client.put_record(**record.to_h) + end end def kinesis_client diff --git a/app/models/journaled/writer.rb b/app/models/journaled/writer.rb index bc172d6..c6db5a5 100644 --- a/app/models/journaled/writer.rb +++ b/app/models/journaled/writer.rb @@ -26,8 +26,34 @@ def initialize(journaled_event:) def journal! validate! - ActiveSupport::Notifications.instrument('journaled.event.enqueue', event: journaled_event, priority: job_opts[:priority]) do - Journaled::DeliveryJob.set(job_opts).perform_later(**delivery_perform_args) + + ActiveSupport::Notifications.instrument('journaled.event.stage', event: journaled_event, **journaled_enqueue_opts) do + if Journaled::Connection.available? + Journaled::Connection.stage!(journaled_event) + else + self.class.enqueue!(journaled_event) + end + end + end + + def self.enqueue!(*events) + events.group_by(&:journaled_enqueue_opts).each do |enqueue_opts, batch| + job_opts = enqueue_opts.reverse_merge(priority: Journaled.job_priority) + ActiveSupport::Notifications.instrument('journaled.batch.enqueue', batch: batch, **job_opts) do + Journaled::DeliveryJob.set(job_opts).perform_later(*delivery_perform_args(batch)) + + batch.each { |event| ActiveSupport::Notifications.instrument('journaled.event.enqueue', event: event, **job_opts) } + end + end + end + + def self.delivery_perform_args(events) + events.map do |event| + { + serialized_event: event.journaled_attributes.to_json, + partition_key: event.journaled_partition_key, + stream_name: event.journaled_stream_name, + } end end @@ -38,27 +64,13 @@ def journal! delegate(*EVENT_METHOD_NAMES, to: :journaled_event) def validate! + serialized_event = journaled_event.journaled_attributes.to_json + schema_validator('base_event').validate! serialized_event schema_validator('tagged_event').validate! serialized_event if journaled_event.tagged? schema_validator(journaled_schema_name).validate! serialized_event end - def job_opts - journaled_enqueue_opts.reverse_merge(priority: Journaled.job_priority) - end - - def delivery_perform_args - { - serialized_event: serialized_event, - partition_key: journaled_partition_key, - stream_name: journaled_stream_name, - } - end - - def serialized_event - @serialized_event ||= journaled_attributes.to_json - end - def schema_validator(schema_name) Journaled::JsonSchemaModel::Validator.new(schema_name) end diff --git a/journaled.gemspec b/journaled.gemspec index d0aafd7..5fb8460 100644 --- a/journaled.gemspec +++ b/journaled.gemspec @@ -16,10 +16,11 @@ Gem::Specification.new do |s| s.metadata['rubygems_mfa_required'] = 'true' s.files = Dir["{app,config,lib,journaled_schemas}/**/*", "LICENSE", "Rakefile", "README.md"] - s.test_files = Dir["spec/**/*"] s.required_ruby_version = ">= 2.6" + s.post_install_message = File.read("UPGRADING") if File.exist?('UPGRADING') + s.add_dependency "activejob" s.add_dependency "activerecord" s.add_dependency "aws-sdk-kinesis", "< 2" diff --git a/lib/journaled.rb b/lib/journaled.rb index 939faa2..19c57e0 100644 --- a/lib/journaled.rb +++ b/lib/journaled.rb @@ -4,6 +4,8 @@ require "journaled/engine" require "journaled/current" +require "journaled/errors" +require 'journaled/connection' module Journaled SUPPORTED_QUEUE_ADAPTERS = %w(delayed delayed_job good_job que).freeze @@ -14,32 +16,36 @@ module Journaled mattr_accessor(:http_open_timeout) { 2 } mattr_accessor(:http_read_timeout) { 60 } mattr_accessor(:job_base_class_name) { 'ActiveJob::Base' } + mattr_accessor(:transactional_batching_enabled) { true } - def development_or_test? + def self.development_or_test? %w(development test).include?(Rails.env) end - def enabled? + def self.enabled? ['0', 'false', false, 'f', ''].exclude?(ENV.fetch('JOURNALED_ENABLED', !development_or_test?)) end - def schema_providers + def self.schema_providers @schema_providers ||= [Journaled::Engine, Rails] end - def commit_hash + def self.commit_hash ENV.fetch('GIT_COMMIT') end - def actor_uri + def self.actor_uri Journaled::ActorUriProvider.instance.actor_uri end - def detect_queue_adapter! - adapter = job_base_class_name.constantize.queue_adapter_name - unless SUPPORTED_QUEUE_ADAPTERS.include?(adapter) + def self.queue_adapter + job_base_class_name.constantize.queue_adapter_name + end + + def self.detect_queue_adapter! + unless SUPPORTED_QUEUE_ADAPTERS.include?(queue_adapter) raise <<~MSG - Journaled has detected an unsupported ActiveJob queue adapter: `:#{adapter}` + Journaled has detected an unsupported ActiveJob queue adapter: `:#{queue_adapter}` Journaled jobs must be enqueued transactionally to your primary database. @@ -62,6 +68,4 @@ def self.tagged(**tags) def self.tag!(**tags) Current.tags = Current.tags.merge(tags) end - - module_function :development_or_test?, :enabled?, :schema_providers, :commit_hash, :actor_uri, :detect_queue_adapter! end diff --git a/lib/journaled/connection.rb b/lib/journaled/connection.rb new file mode 100644 index 0000000..5301a3b --- /dev/null +++ b/lib/journaled/connection.rb @@ -0,0 +1,48 @@ +module Journaled + module Connection + class << self + def available? + Journaled.transactional_batching_enabled && transaction_open? + end + + def stage!(event) + raise TransactionSafetyError, <<~MSG unless transaction_open? + Transaction not available! By default, journaled event batching requires an open database transaction. + MSG + + connection.current_transaction._journaled_staged_events << event + end + + private + + def transaction_open? + connection.transaction_open? + end + + def connection + if Journaled.queue_adapter.in? %w(delayed delayed_job) + Delayed::Job.connection + elsif Journaled.queue_adapter == 'good_job' + GoodJob::BaseRecord.connection + elsif Journaled.queue_adapter == 'que' + Que::ActiveRecord::Model.connection + elsif Journaled.queue_adapter == 'test' && Rails.env.test? + ActiveRecord::Base.connection + else + raise "Unsupported adapter: #{Journaled.queue_adapter}" + end + end + end + + module TestOnlyBehaviors + def transaction_open? + # Transactional fixtures wrap all tests in an outer, non-joinable transaction: + super && (connection.open_transactions > 1 || connection.current_transaction.joinable?) + end + end + + class << self + prepend TestOnlyBehaviors if Rails.env.test? + end + end +end diff --git a/lib/journaled/engine.rb b/lib/journaled/engine.rb index e27fb58..c3ee996 100644 --- a/lib/journaled/engine.rb +++ b/lib/journaled/engine.rb @@ -4,6 +4,11 @@ class Engine < ::Rails::Engine ActiveSupport.on_load(:active_job) do Journaled.detect_queue_adapter! unless Journaled.development_or_test? end + + ActiveSupport.on_load(:active_record) do + require 'journaled/transaction_ext' + ActiveRecord::ConnectionAdapters::Transaction.prepend Journaled::TransactionExt + end end end end diff --git a/lib/journaled/errors.rb b/lib/journaled/errors.rb new file mode 100644 index 0000000..3697abb --- /dev/null +++ b/lib/journaled/errors.rb @@ -0,0 +1,3 @@ +module Journaled + class TransactionSafetyError < StandardError; end +end diff --git a/lib/journaled/transaction_ext.rb b/lib/journaled/transaction_ext.rb new file mode 100644 index 0000000..0aad406 --- /dev/null +++ b/lib/journaled/transaction_ext.rb @@ -0,0 +1,31 @@ +require 'active_record/connection_adapters/abstract/transaction' + +module Journaled + module TransactionExt + def initialize(*, **) + super.tap do + raise TransactionSafetyError, <<~MSG unless instance_variable_defined?(:@run_commit_callbacks) + Journaled::TransactionExt expects @run_commit_callbacks to be defined on Transaction! + This is an internal API that may have changed in a recent Rails release. + If you were not expecting to see this error, please file an issue here: + https://github.com/Betterment/journaled/issues + MSG + end + end + + def before_commit_records + super.tap do + Writer.enqueue!(*_journaled_staged_events) if @run_commit_callbacks + end + end + + def commit_records + connection.current_transaction._journaled_staged_events.push(*_journaled_staged_events) unless @run_commit_callbacks + super + end + + def _journaled_staged_events + @_journaled_staged_events ||= [] + end + end +end diff --git a/lib/journaled/version.rb b/lib/journaled/version.rb index 63205f4..3a0a731 100644 --- a/lib/journaled/version.rb +++ b/lib/journaled/version.rb @@ -1,3 +1,3 @@ module Journaled - VERSION = "4.3.0".freeze + VERSION = "5.0.0".freeze end diff --git a/spec/jobs/journaled/delivery_job_spec.rb b/spec/jobs/journaled/delivery_job_spec.rb index b1123e2..1bf62e1 100644 --- a/spec/jobs/journaled/delivery_job_spec.rb +++ b/spec/jobs/journaled/delivery_job_spec.rb @@ -1,11 +1,13 @@ require 'rails_helper' RSpec.describe Journaled::DeliveryJob do - let(:stream_name) { 'test_events' } - let(:partition_key) { 'fake_partition_key' } - let(:serialized_event) { '{"foo":"bar"}' } let(:kinesis_client) { Aws::Kinesis::Client.new(stub_responses: true) } - let(:args) { { serialized_event: serialized_event, partition_key: partition_key, stream_name: stream_name } } + let(:args) do + [ + { serialized_event: '{"foo":"bar"}', partition_key: 'fake_partition_key', stream_name: 'test_events' }, + { serialized_event: '{"baz":"bat"}', partition_key: 'fake_partition_key_2', stream_name: 'test_events_2' }, + ] + end describe '#perform' do let(:return_status_body) { { shard_id: '101', sequence_number: '101123' } } @@ -21,15 +23,21 @@ end it 'makes requests to AWS to put the event on the Kinesis with the correct body' do - event = described_class.perform_now(**args) + events = described_class.perform_now(*args) - expect(event.shard_id).to eq '101' - expect(event.sequence_number).to eq '101123' + expect(events.count).to eq 2 + expect(events.first.shard_id).to eq '101' + expect(events.first.sequence_number).to eq '101123' expect(kinesis_client).to have_received(:put_record).with( stream_name: 'test_events', data: '{"foo":"bar"}', partition_key: 'fake_partition_key', ) + expect(kinesis_client).to have_received(:put_record).with( + stream_name: 'test_events_2', + data: '{"baz":"bat"}', + partition_key: 'fake_partition_key_2', + ) end context 'when JOURNALED_IAM_ROLE_ARN is defined' do @@ -60,7 +68,7 @@ end it 'initializes a Kinesis client with assume role credentials' do - described_class.perform_now(**args) + described_class.perform_now(*args) expect(Aws::AssumeRoleCredentials).to have_received(:new).with( client: aws_sts_client, @@ -71,63 +79,47 @@ end context 'when the stream name is not set' do - let(:stream_name) { nil } + let(:args) { [{ serialized_event: '{"foo":"bar"}', partition_key: 'fake_partition_key', stream_name: nil }] } - it 'raises an KeyError error' do - expect { described_class.perform_now(**args) }.to raise_error ArgumentError, 'missing keyword: stream_name' + it 'raises an ArgumentError error' do + expect { described_class.perform_now(*args) }.to raise_error ArgumentError, /missing keyword: :?stream_name/ end end - unless Gem::Version.new(Journaled::VERSION) < Gem::Version.new('5.0.0') + unless Gem::Version.new(Journaled::VERSION) < Gem::Version.new('6.0.0') raise <<~MSG - Hey! I see that you're bumping the version to 5.0! + Hey! I see that you're bumping the version to 6.0! This is a reminder to: - - remove the `app_name` argument (and related logic) from `Journaled::DeliveryJob`, - - remove the following app_name test contexts, and - - make `stream_name` a required kwarg + - Remove the `legacy_kwargs` argument (and related logic) from `Journaled::DeliveryJob` + - Remove the related test context below Thanks! MSG end - context 'when the legacy app_name argument is present but nil' do - let(:args) { { serialized_event: serialized_event, partition_key: partition_key, app_name: nil } } - - around do |example| - with_env(JOURNALED_STREAM_NAME: 'legacy_stream_name') { example.run } - end + context 'when supplying legacy kwargs (a single event) instead of a list of events' do + let(:args) { { serialized_event: '{"foo":"bar"}', partition_key: 'fake_partition_key', stream_name: 'test_events' } } it 'makes requests to AWS to put the event on the Kinesis with the correct body' do - event = described_class.perform_now(**args) + events = described_class.perform_now(**args) - expect(event.shard_id).to eq '101' - expect(event.sequence_number).to eq '101123' + expect(events.count).to eq 1 + expect(events.first.shard_id).to eq '101' + expect(events.first.sequence_number).to eq '101123' expect(kinesis_client).to have_received(:put_record).with( - stream_name: 'legacy_stream_name', + stream_name: 'test_events', data: '{"foo":"bar"}', partition_key: 'fake_partition_key', ) end - end - context 'when the legacy app_name argument is present and has a value' do - let(:args) { { serialized_event: serialized_event, partition_key: partition_key, app_name: 'pied_piper' } } + context 'when the stream name is not set' do + let(:args) { { serialized_event: '{"foo":"bar"}', partition_key: 'fake_partition_key', stream_name: nil } } - around do |example| - with_env(PIED_PIPER_JOURNALED_STREAM_NAME: 'pied_piper_events') { example.run } - end - - it 'makes requests to AWS to put the event on the Kinesis with the correct body' do - event = described_class.perform_now(**args) - - expect(event.shard_id).to eq '101' - expect(event.sequence_number).to eq '101123' - expect(kinesis_client).to have_received(:put_record).with( - stream_name: 'pied_piper_events', - data: '{"foo":"bar"}', - partition_key: 'fake_partition_key', - ) + it 'raises an ArgumentError' do + expect { described_class.perform_now(**args) }.to raise_error ArgumentError, /missing keyword: :?stream_name/ + end end end @@ -138,7 +130,7 @@ it 'catches the error and re-raises a subclass of NotTrulyExceptionalError and logs about the failure' do allow(Rails.logger).to receive(:error) - expect { described_class.perform_now(**args) }.to raise_error described_class::KinesisTemporaryFailure + expect { described_class.perform_now(*args) }.to raise_error described_class::KinesisTemporaryFailure expect(Rails.logger).to have_received(:error).with( "Kinesis Error - Server Error occurred - Aws::Kinesis::Errors::InternalFailure", ).once @@ -152,7 +144,7 @@ it 'catches the error and re-raises a subclass of NotTrulyExceptionalError and logs about the failure' do allow(Rails.logger).to receive(:error) - expect { described_class.perform_now(**args) }.to raise_error described_class::KinesisTemporaryFailure + expect { described_class.perform_now(*args) }.to raise_error described_class::KinesisTemporaryFailure expect(Rails.logger).to have_received(:error).with(/\AKinesis Error/).once end end @@ -163,7 +155,7 @@ end it 'raises an error that subclasses Aws::Kinesis::Errors::ServiceError' do - expect { described_class.perform_now(**args) }.to raise_error Aws::Kinesis::Errors::ServiceError + expect { described_class.perform_now(*args) }.to raise_error Aws::Kinesis::Errors::ServiceError end end @@ -173,7 +165,7 @@ end it 'raises an AccessDeniedException error' do - expect { described_class.perform_now(**args) }.to raise_error Aws::Kinesis::Errors::AccessDeniedException + expect { described_class.perform_now(*args) }.to raise_error Aws::Kinesis::Errors::AccessDeniedException end end @@ -184,7 +176,7 @@ it 'catches the error and re-raises a subclass of NotTrulyExceptionalError and logs about the failure' do allow(Rails.logger).to receive(:error) - expect { described_class.perform_now(**args) }.to raise_error described_class::KinesisTemporaryFailure + expect { described_class.perform_now(*args) }.to raise_error described_class::KinesisTemporaryFailure expect(Rails.logger).to have_received(:error).with( "Kinesis Error - Networking Error occurred - Seahorse::Client::NetworkingError", ).once @@ -192,24 +184,6 @@ end end - describe ".legacy_computed_stream_name" do - context "when app_name is unspecified" do - it "is fetched from a prefixed ENV var if specified" do - allow(ENV).to receive(:fetch).and_return("expected_stream_name") - expect(described_class.legacy_computed_stream_name(app_name: nil)).to eq("expected_stream_name") - expect(ENV).to have_received(:fetch).with("JOURNALED_STREAM_NAME") - end - end - - context "when app_name is specified" do - it "is fetched from a prefixed ENV var if specified" do - allow(ENV).to receive(:fetch).and_return("expected_stream_name") - expect(described_class.legacy_computed_stream_name(app_name: "my_funky_app_name")).to eq("expected_stream_name") - expect(ENV).to have_received(:fetch).with("MY_FUNKY_APP_NAME_JOURNALED_STREAM_NAME") - end - end - end - describe "#kinesis_client_config" do it "is in us-east-1 by default" do with_env(AWS_DEFAULT_REGION: nil) do diff --git a/spec/lib/journaled/connection_spec.rb b/spec/lib/journaled/connection_spec.rb new file mode 100644 index 0000000..6f70f5c --- /dev/null +++ b/spec/lib/journaled/connection_spec.rb @@ -0,0 +1,45 @@ +require 'rails_helper' + +RSpec.describe Journaled::Connection do + describe '.available?, .stage!' do + let(:event_class) { Class.new { include Journaled::Event } } + let(:event) do + instance_double( + event_class, + journaled_schema_name: nil, + journaled_attributes: {}, + journaled_partition_key: '', + journaled_stream_name: nil, + journaled_enqueue_opts: {}, + ) + end + + it 'returns false, and raises an error when events are staged' do + expect(described_class.available?).to be false + expect { described_class.stage!(event) }.to raise_error(Journaled::TransactionSafetyError) + end + + context 'when inside of a transaction' do + it 'returns true, and allows for staging events' do + ActiveRecord::Base.transaction do + expect(described_class.available?).to be true + expect { described_class.stage!(event) }.not_to raise_error + end + end + + context 'when transactional batching is disabled' do + around do |example| + Journaled.transactional_batching_enabled = false + example.run + ensure + Journaled.transactional_batching_enabled = true + end + + it 'returns false, and raises an error when events are staged' do + expect(described_class.available?).to be false + expect { described_class.stage!(event) }.to raise_error(Journaled::TransactionSafetyError) + end + end + end + end +end diff --git a/spec/models/journaled/writer_spec.rb b/spec/models/journaled/writer_spec.rb index 9c5ff7d..75e25ff 100644 --- a/spec/models/journaled/writer_spec.rb +++ b/spec/models/journaled/writer_spec.rb @@ -208,5 +208,251 @@ end end end + + context 'when inside of a transaction' do + def fake_event(num) + instance_double( + event_class, + journaled_schema_name: 'fake_schema_name', + journaled_attributes: { id: "FAKE_UUID_#{num}", event_type: "fake_event_#{num}", created_at: Time.zone.now, foo: :bar }, + journaled_partition_key: 'fake_partition_key', + journaled_stream_name: 'my_app_events', + journaled_enqueue_opts: journaled_enqueue_opts, + tagged?: false, + ) + end + + it 'batches multiple events and does not enqueue until the end of a transaction' do + expect { + ActiveRecord::Base.transaction do + expect { described_class.new(journaled_event: fake_event(1)).journal! } + .to not_change { enqueued_jobs.count }.from(0) + .and not_journal_event_including(id: 'FAKE_UUID_1', event_type: 'fake_event_1') + expect { described_class.new(journaled_event: fake_event(2)).journal! } + .to not_change { enqueued_jobs.count }.from(0) + .and not_journal_event_including(id: 'FAKE_UUID_2', event_type: 'fake_event_2') + end + }.to change { enqueued_jobs.count }.from(0).to(1) + .and journal_event_including(id: 'FAKE_UUID_1', event_type: 'fake_event_1') + .and journal_event_including(id: 'FAKE_UUID_2', event_type: 'fake_event_2') + end + + context 'when a transaction rolls back' do + it 'batches multiple events and does not enqueue until the end of a transaction' do + expect { + ActiveRecord::Base.transaction do + expect { described_class.new(journaled_event: fake_event(1)).journal! } + .to not_change { enqueued_jobs.count }.from(0) + .and not_journal_event_including(id: 'FAKE_UUID_1', event_type: 'fake_event_1') + expect { described_class.new(journaled_event: fake_event(2)).journal! } + .to not_change { enqueued_jobs.count }.from(0) + .and not_journal_event_including(id: 'FAKE_UUID_2', event_type: 'fake_event_2') + raise ActiveRecord::Rollback + end + }.to not_change { enqueued_jobs.count }.from(0) + .and not_journal_event_including(id: 'FAKE_UUID_1', event_type: 'fake_event_1') + .and not_journal_event_including(id: 'FAKE_UUID_2', event_type: 'fake_event_2') + + # Make sure that prior transaction does not impact behavior of future transactions: + expect { + ActiveRecord::Base.transaction do + expect { described_class.new(journaled_event: fake_event(3)).journal! } + .to not_change { enqueued_jobs.count }.from(0) + .and not_journal_event_including(id: 'FAKE_UUID_1', event_type: 'fake_event_1') + end + }.to change { enqueued_jobs.count }.from(0).to(1) + .and not_journal_event_including(id: 'FAKE_UUID_1', event_type: 'fake_event_1') + .and not_journal_event_including(id: 'FAKE_UUID_2', event_type: 'fake_event_2') + .and journal_event_including(id: 'FAKE_UUID_3', event_type: 'fake_event_3') + end + + context 'when there is a nested transaction with events' do + it 'emits the events enqueued within the savepoint transaction within a separate batch' do + expect { + ActiveRecord::Base.transaction do + expect { described_class.new(journaled_event: fake_event(1)).journal! } + .to not_change { enqueued_jobs.count }.from(0) + .and not_journal_event_including(id: 'FAKE_UUID_1', event_type: 'fake_event_1') + expect { + ActiveRecord::Base.transaction do + expect { described_class.new(journaled_event: fake_event(2)).journal! } + .to not_change { enqueued_jobs.count }.from(0) + .and not_journal_event_including(id: 'FAKE_UUID_2', event_type: 'fake_event_2') + end + }.to not_change { enqueued_jobs.count }.from(0) + .and not_journal_event_including(id: 'FAKE_UUID_2', event_type: 'fake_event_2') + .and not_journal_event_including(id: 'FAKE_UUID_1', event_type: 'fake_event_1') + end + }.to change { enqueued_jobs.count }.from(0).to(1) + .and journal_event_including(id: 'FAKE_UUID_1', event_type: 'fake_event_1') + .and journal_event_including(id: 'FAKE_UUID_2', event_type: 'fake_event_2') + end + + context 'and the inner transaction rolls back' do + it 'does not emit the events enqueued within the savepoint transaction' do + expect { + ActiveRecord::Base.transaction do + expect { described_class.new(journaled_event: fake_event(1)).journal! } + .to not_change { enqueued_jobs.count }.from(0) + ActiveRecord::Base.transaction do + expect { described_class.new(journaled_event: fake_event(2)).journal! } + .to not_change { enqueued_jobs.count }.from(0) + raise 'ohno' + end + end + }.to raise_error('ohno') + .and not_change { enqueued_jobs.count }.from(0) + .and not_journal_event_including(id: 'FAKE_UUID_1', event_type: 'fake_event_1') + .and not_journal_event_including(id: 'FAKE_UUID_2', event_type: 'fake_event_2') + end + end + + context 'and the outer transaction rolls back' do + it 'does not emit any events' do + expect { + ActiveRecord::Base.transaction do + expect { described_class.new(journaled_event: fake_event(1)).journal! } + .to not_change { enqueued_jobs.count }.from(0) + expect { + ActiveRecord::Base.transaction do + expect { described_class.new(journaled_event: fake_event(2)).journal! } + .to not_change { enqueued_jobs.count }.from(0) + end + }.to not_change { enqueued_jobs.count }.from(0) + raise 'ohno' + end + }.to raise_error('ohno') + .and not_change { enqueued_jobs.count }.from(0) + .and not_journal_event_including(id: 'FAKE_UUID_1', event_type: 'fake_event_1') + .and not_journal_event_including(id: 'FAKE_UUID_2', event_type: 'fake_event_2') + end + end + end + + context 'when there is a savepoint transaction with events' do + it 'emits the events enqueued within the savepoint transaction within a separate batch' do + expect { + ActiveRecord::Base.transaction do + expect { described_class.new(journaled_event: fake_event(1)).journal! } + .to not_change { enqueued_jobs.count }.from(0) + .and not_journal_event_including(id: 'FAKE_UUID_1', event_type: 'fake_event_1') + expect { + ActiveRecord::Base.transaction(requires_new: true) do + expect { described_class.new(journaled_event: fake_event(2)).journal! } + .to not_change { enqueued_jobs.count }.from(0) + .and not_journal_event_including(id: 'FAKE_UUID_2', event_type: 'fake_event_2') + end + }.to not_change { enqueued_jobs.count }.from(0) + .and not_journal_event_including(id: 'FAKE_UUID_2', event_type: 'fake_event_2') + .and not_journal_event_including(id: 'FAKE_UUID_1', event_type: 'fake_event_1') + end + }.to change { enqueued_jobs.count }.from(0).to(1) + .and journal_event_including(id: 'FAKE_UUID_1', event_type: 'fake_event_1') + .and journal_event_including(id: 'FAKE_UUID_2', event_type: 'fake_event_2') + end + + context 'and the inner transaction rolls back' do + it 'does not emit the events enqueued within the savepoint transaction' do + expect { + ActiveRecord::Base.transaction do + expect { described_class.new(journaled_event: fake_event(1)).journal! } + .to not_change { enqueued_jobs.count }.from(0) + expect { + ActiveRecord::Base.transaction(requires_new: true) do + expect { described_class.new(journaled_event: fake_event(2)).journal! } + .to not_change { enqueued_jobs.count }.from(0) + raise ActiveRecord::Rollback + end + }.to not_change { enqueued_jobs.count }.from(0) + end + }.to change { enqueued_jobs.count }.from(0).to(1) + .and journal_event_including(id: 'FAKE_UUID_1', event_type: 'fake_event_1') + .and not_journal_event_including(id: 'FAKE_UUID_2', event_type: 'fake_event_2') + end + end + + context 'and the outer transaction rolls back' do + it 'does not emit any events' do + expect { + ActiveRecord::Base.transaction do + expect { described_class.new(journaled_event: fake_event(1)).journal! } + .to not_change { enqueued_jobs.count }.from(0) + expect { + ActiveRecord::Base.transaction(requires_new: true) do + expect { described_class.new(journaled_event: fake_event(2)).journal! } + .to not_change { enqueued_jobs.count }.from(0) + end + }.to not_change { enqueued_jobs.count }.from(0) + raise ActiveRecord::Rollback + end + }.to not_change { enqueued_jobs.count }.from(0) + .and not_journal_event_including(id: 'FAKE_UUID_1', event_type: 'fake_event_1') + .and not_journal_event_including(id: 'FAKE_UUID_2', event_type: 'fake_event_2') + end + end + end + end + + context 'when transactional batching is disabled' do + around do |example| + Journaled.transactional_batching_enabled = false + example.run + ensure + Journaled.transactional_batching_enabled = true + end + + it 'does not batch the events, and enqueues them as they are journaled' do + expect { + ActiveRecord::Base.transaction do + expect { described_class.new(journaled_event: fake_event(1)).journal! } + .to change { enqueued_jobs.count }.from(0).to(1) + .and journal_event_including(id: 'FAKE_UUID_1', event_type: 'fake_event_1') + expect { described_class.new(journaled_event: fake_event(5)).journal! } + .to change { enqueued_jobs.count }.from(1).to(2) + .and journal_event_including(id: 'FAKE_UUID_5', event_type: 'fake_event_5') + end + }.to change { enqueued_jobs.count }.from(0).to(2) + .and journal_event_including(id: 'FAKE_UUID_1', event_type: 'fake_event_1') + .and journal_event_including(id: 'FAKE_UUID_5', event_type: 'fake_event_5') + end + end + + context 'when an event is enqueued in its own before_commit callback' do + before do + before_commit_callback = -> { described_class.new(journaled_event: fake_event(9)).journal! } + klass = Class.new(ActiveRecord::Base) { self.table_name = 'widgets' } + klass.before_commit { before_commit_callback.call } + stub_const('Widget', klass) + end + + it 'still enqeues the event' do + expect { + ActiveRecord::Base.transaction do + expect { Widget.create! } + .to not_change { enqueued_jobs.count }.from(0) + .and not_journal_event_including(id: 'FAKE_UUID_9', event_type: 'fake_event_9') + end + }.to change { enqueued_jobs.count }.from(0).to(1) + .and journal_event_including(id: 'FAKE_UUID_9', event_type: 'fake_event_9') + end + + context 'when events were already batched up' do + it 'enqueues the event in a new batch (because TransactionHandler#joinable? is false)' do + expect { + ActiveRecord::Base.transaction do + expect { described_class.new(journaled_event: fake_event(7)).journal! } + .to not_change { enqueued_jobs.count }.from(0) + .and not_journal_event_including(id: 'FAKE_UUID_7', event_type: 'fake_event_7') + expect { Widget.create! } + .to not_change { enqueued_jobs.count }.from(0) + .and not_journal_event_including(id: 'FAKE_UUID_9', event_type: 'fake_event_9') + end + }.to change { enqueued_jobs.count }.from(0).to(1) + .and journal_event_including(id: 'FAKE_UUID_7', event_type: 'fake_event_7') + .and journal_event_including(id: 'FAKE_UUID_9', event_type: 'fake_event_9') + end + end + end + end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 6cf1224..a5b1367 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -11,6 +11,8 @@ load File.expand_path('dummy/db/schema.rb', __dir__) end +RSpec::Matchers.define_negated_matcher :not_change, :change + RSpec.configure do |config| config.expect_with :rspec do |expectations| expectations.include_chain_clauses_in_custom_matcher_descriptions = true diff --git a/spec/support/environment_spec_helper.rb b/spec/support/environment_spec_helper.rb index 8a69ff7..12bd4ca 100644 --- a/spec/support/environment_spec_helper.rb +++ b/spec/support/environment_spec_helper.rb @@ -4,7 +4,7 @@ def with_env(opts = {}) opts.each do |k, v| k = k.to_s v = v.to_s unless v.nil? - old[k] = ENV[k] + old[k] = ENV.fetch(k, nil) ENV[k] = v end yield