Skip to content

Commit

Permalink
Add a config for enforcing a minimum job reserve interval (default: 0…
Browse files Browse the repository at this point in the history
… for now) (#48)

This is related to #41 and #42 insofar as a kind of DB-bound "spinloop"
is still possible if a worker picks up jobs that take so little time
that the worker immediately turns around and asks for more. As of now,
there has been no way to tune the amount of time a worker should wait in
between _successful_ iterations of its run loop.

This introduces a configuration (`min_reserve_interval`) specifying a
minimum number of seconds (default: 0 as this is not a major release)
that a worker should wait in between _successful_ job reserve queries.
An existing config (`sleep_delay`) is still used to define the number of
seconds (default: 5) that a worker should wait in between _unsuccessful_
job reserve attempts (i.e. the queue is empty).

The job execution time is subtracted from `min_reserve_interval` when
the worker sleeps, and if jobs take more than `min_reserve_interval` to
complete than the worker will not sleep before the next reserve query.

/no-platform
  • Loading branch information
smudge authored Dec 18, 2024
1 parent c0c36c0 commit c8ac357
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 54 deletions.
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
delayed (0.5.5)
delayed (0.6.0)
activerecord (>= 5.2)
concurrent-ruby

Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,10 @@ Delayed::Worker.read_ahead = 5

# If a worker finds no jobs, it will sleep this number of seconds in between attempts:
Delayed::Worker.sleep_delay = 5

# Until version 1.0, the worker will not sleep at all between attemps if it finds jobs.
# This can be configured by setting the minimum reserve interval:
Delayed::Worker.min_reserve_interval = 0.5 # seconds
```

If a job fails, it will be rerun up to 25 times (with an exponential back-off). Jobs will also
Expand Down
2 changes: 1 addition & 1 deletion delayed.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Gem::Specification.new do |spec|
spec.require_paths = ['lib']
spec.summary = 'a multi-threaded, SQL-driven ActiveJob backend used at Betterment to process millions of background jobs per day'

spec.version = '0.5.5'
spec.version = '0.6.0'
spec.metadata = {
'changelog_uri' => 'https://github.com/betterment/delayed/blob/main/CHANGELOG.md',
'bug_tracker_uri' => 'https://github.com/betterment/delayed/issues',
Expand Down
2 changes: 1 addition & 1 deletion lib/delayed/runnable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def start
def on_exit!; end

def interruptable_sleep(seconds)
pipe[0].wait_readable(seconds)
pipe[0].wait_readable(seconds) if seconds.positive?
end

def stop
Expand Down
9 changes: 9 additions & 0 deletions lib/delayed/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class Worker
include Runnable

cattr_accessor :sleep_delay, instance_writer: false, default: 5
cattr_accessor :min_reserve_interval, instance_writer: false, default: 0
cattr_accessor :max_attempts, instance_writer: false, default: 25
cattr_accessor :max_claims, instance_writer: false, default: 5
cattr_accessor :max_run_time, instance_writer: false, default: 20.minutes
Expand Down Expand Up @@ -92,6 +93,7 @@ def work_off(num = 100)
total = 0

while total < num
start = clock_time
jobs = reserve_jobs
break if jobs.empty?

Expand All @@ -107,6 +109,9 @@ def work_off(num = 100)
pool.wait_for_termination

break if stop? # leave if we're exiting

elapsed = clock_time - start
interruptable_sleep(self.class.min_reserve_interval - elapsed)
end

[success.value, total - success.value]
Expand Down Expand Up @@ -227,5 +232,9 @@ def reserve_jobs
def reload!
Rails.application.reloader.reload! if defined?(Rails.application.reloader) && Rails.application.reloader.check!
end

def clock_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
end
end
4 changes: 2 additions & 2 deletions spec/delayed/tasks_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def stub_env(key, value)
.to change { Delayed::Worker.min_priority }.from(nil).to(6)
.and change { Delayed::Worker.max_priority }.from(nil).to(8)
.and change { Delayed::Worker.queues }.from([]).to(%w(foo bar))
.and change { Delayed::Worker.sleep_delay }.from(5).to(1)
.and change { Delayed::Worker.sleep_delay }.from(TEST_SLEEP_DELAY).to(1)
.and change { Delayed::Worker.read_ahead }.from(5).to(3)
.and change { Delayed::Worker.max_claims }.from(5).to(3)
end
Expand Down Expand Up @@ -96,7 +96,7 @@ def stub_env(key, value)
.to change { Delayed::Worker.min_priority }.from(nil).to(6)
.and change { Delayed::Worker.max_priority }.from(nil).to(8)
.and change { Delayed::Worker.queues }.from([]).to(%w(foo))
.and change { Delayed::Worker.sleep_delay }.from(5).to(1)
.and change { Delayed::Worker.sleep_delay }.from(TEST_SLEEP_DELAY).to(1)
.and change { Delayed::Worker.read_ahead }.from(5).to(3)
.and change { Delayed::Worker.max_claims }.from(5).to(3)
end
Expand Down
14 changes: 14 additions & 0 deletions spec/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ class SingletonClass
include Singleton
end

# Negative values are treated as sleep(0),
# so we can use different values to test the sleep behavior:
TEST_MIN_RESERVE_INTERVAL = -10
TEST_SLEEP_DELAY = -100

RSpec.configure do |config|
config.around(:each) do |example|
aj_priority_was = ActiveJob::Base.priority
Expand All @@ -113,6 +118,14 @@ class SingletonClass
queues_was = Delayed::Worker.queues
read_ahead_was = Delayed::Worker.read_ahead
sleep_delay_was = Delayed::Worker.sleep_delay
min_reserve_interval_was = Delayed::Worker.min_reserve_interval

if Gem.loaded_specs['delayed'].version >= Gem::Version.new('1.0') && min_reserve_interval_was.zero?
raise "Min reserve interval should be nonzero in v1.0 release"
end

Delayed::Worker.sleep_delay = TEST_SLEEP_DELAY
Delayed::Worker.min_reserve_interval = TEST_MIN_RESERVE_INTERVAL

example.run
ensure
Expand All @@ -130,6 +143,7 @@ class SingletonClass
Delayed::Worker.queues = queues_was
Delayed::Worker.read_ahead = read_ahead_was
Delayed::Worker.sleep_delay = sleep_delay_was
Delayed::Worker.min_reserve_interval = min_reserve_interval_was

Delayed::Job.delete_all
end
Expand Down
106 changes: 57 additions & 49 deletions spec/worker_spec.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
require 'helper'

describe Delayed::Worker do
before do
described_class.sleep_delay = 0
end

describe 'start' do
it 'runs the :execute lifecycle hook' do
performances = []
Expand Down Expand Up @@ -32,62 +28,74 @@
allow(subject).to receive(:interruptable_sleep).and_call_original
end

context 'when there are no jobs' do
before do
allow(Delayed::Job).to receive(:reserve).and_return([])
end
around do |example|
max_claims_was = described_class.max_claims
described_class.max_claims = max_claims
example.run
ensure
described_class.max_claims = max_claims_was
end

it 'does not log and then sleeps' do
before do
allow(Delayed::Job).to receive(:reserve).and_return((0...jobs_returned).map { job }, [])
end

let(:max_claims) { 1 }
let(:jobs_returned) { 1 }
let(:job) do
instance_double(
Delayed::Job,
id: 123,
max_run_time: 10,
name: 'MyJob',
run_at: Delayed::Job.db_time_now,
created_at: Delayed::Job.db_time_now,
priority: Delayed::Priority.interactive,
queue: 'testqueue',
attempts: 0,
invoke_job: true,
destroy: true,
)
end

it 'logs the count and sleeps only within the loop' do
subject.run!
expect(Delayed.logger).to have_received(:info).with(/1 jobs processed/)
expect(subject).to have_received(:interruptable_sleep).once.with(a_value_within(1).of(TEST_MIN_RESERVE_INTERVAL))
expect(subject).not_to have_received(:interruptable_sleep).with(TEST_SLEEP_DELAY)
end

context 'when no jobs are returned' do
let(:jobs_returned) { 0 }

it 'does not log and then sleeps only outside of the loop' do
subject.run!
expect(Delayed.logger).not_to have_received(:info)
expect(subject).to have_received(:interruptable_sleep)
expect(subject).to have_received(:interruptable_sleep).with(TEST_SLEEP_DELAY)
end
end

context 'when there is a job worked off' do
around do |example|
max_claims_was = described_class.max_claims
described_class.max_claims = max_claims
example.run
ensure
described_class.max_claims = max_claims_was
end

before do
allow(Delayed::Job).to receive(:reserve).and_return([job], [])
end

let(:max_claims) { 1 }
let(:job) do
instance_double(
Delayed::Job,
id: 123,
max_run_time: 10,
name: 'MyJob',
run_at: Delayed::Job.db_time_now,
created_at: Delayed::Job.db_time_now,
priority: Delayed::Priority.interactive,
queue: 'testqueue',
attempts: 0,
invoke_job: true,
destroy: true,
)
end
context 'when max_claims is 3 and 3 jobs are returned' do
let(:max_claims) { 3 }
let(:jobs_returned) { 3 }

it 'logs the count and does not sleep' do
it 'logs the count and sleeps only in the loop' do
subject.run!
expect(Delayed.logger).to have_received(:info).with(/1 jobs processed/)
expect(subject).not_to have_received(:interruptable_sleep)
expect(Delayed.logger).to have_received(:info).with(/3 jobs processed/)
expect(subject).to have_received(:interruptable_sleep).once.with(a_value_within(1).of(TEST_MIN_RESERVE_INTERVAL))
expect(subject).not_to have_received(:interruptable_sleep).with(TEST_SLEEP_DELAY)
end
end

context 'when max_claims is 2' do
let(:max_claims) { 2 }
context 'when max_claims is 3 and 2 jobs are returned' do
let(:max_claims) { 3 }
let(:jobs_returned) { 2 }

it 'logs the count and sleeps' do
subject.run!
expect(Delayed.logger).to have_received(:info).with(/1 jobs processed/)
expect(subject).to have_received(:interruptable_sleep)
end
it 'logs the count and sleeps both in the loop and outside of the loop' do
subject.run!
expect(Delayed.logger).to have_received(:info).with(/2 jobs processed/)
expect(subject).to have_received(:interruptable_sleep).once.with(a_value_within(1).of(TEST_MIN_RESERVE_INTERVAL))
expect(subject).to have_received(:interruptable_sleep).once.with(TEST_SLEEP_DELAY)
end
end
end
Expand Down

0 comments on commit c8ac357

Please sign in to comment.