Skip to content

Commit

Permalink
Merge branch 'patch-4' of git://github.com/dimko/sidekiq into dimko-p…
Browse files Browse the repository at this point in the history
…atch-4
  • Loading branch information
mperham committed May 25, 2013
2 parents f3f8760 + c7828f1 commit 3dda03d
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 30 deletions.
2 changes: 1 addition & 1 deletion lib/sidekiq/api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ def retry
results.map do |message|
msg = Sidekiq.load_json(message)
msg['retry_count'] = msg['retry_count'] - 1
conn.lpush("queue:#{msg['queue']}", Sidekiq.dump_json(msg))
Sidekiq::Client.push(msg)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def normalize_item(item)
normalized_item = Sidekiq::Worker::ClassMethods::DEFAULT_OPTIONS.merge(item)
end

normalized_item['jid'] = SecureRandom.hex(12)
normalized_item['jid'] ||= SecureRandom.hex(12)
normalized_item['enqueued_at'] = Time.now.to_f
normalized_item
end
Expand Down
5 changes: 1 addition & 4 deletions lib/sidekiq/scheduled.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ def poll(first_time=false)
# the queue, it's because another process already popped it so we can move on to the
# next one.
if conn.zrem(sorted_set, message)
conn.multi do
conn.sadd('queues', msg['queue'])
conn.lpush("queue:#{msg['queue']}", Sidekiq.dump_json(msg))
end
Sidekiq::Client.push(msg)
logger.debug { "enqueued #{sorted_set}: #{message}" }
end
end
Expand Down
82 changes: 58 additions & 24 deletions test/test_scheduled.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,72 @@ def perform(x)
Sidekiq.redis do |conn|
conn.flushdb
end

@error_1 = { 'class' => ScheduledWorker.name, 'args' => [0], 'queue' => 'queue_1' }
@error_2 = { 'class' => ScheduledWorker.name, 'args' => [1], 'queue' => 'queue_2' }
@error_3 = { 'class' => ScheduledWorker.name, 'args' => [2], 'queue' => 'queue_3' }
@future_1 = { 'class' => ScheduledWorker.name, 'args' => [3], 'queue' => 'queue_4' }
@future_2 = { 'class' => ScheduledWorker.name, 'args' => [4], 'queue' => 'queue_5' }
@future_3 = { 'class' => ScheduledWorker.name, 'args' => [5], 'queue' => 'queue_6' }

@retry = Sidekiq::RetrySet.new
@scheduled = Sidekiq::ScheduledSet.new
@poller = Sidekiq::Scheduled::Poller.new
end

class Stopper
def call(worker_class, message, queue)
yield if message['args'].first.odd?
end
end

it 'executes client middleware' do
Sidekiq.client_middleware.add Stopper
begin
@retry.schedule (Time.now - 60).to_f, @error_1
@retry.schedule (Time.now - 60).to_f, @error_2
@scheduled.schedule (Time.now - 60).to_f, @future_2
@scheduled.schedule (Time.now - 60).to_f, @future_3

@poller.poll

Sidekiq.redis do |conn|
assert_equal 0, conn.llen("queue:queue_1")
assert_equal 1, conn.llen("queue:queue_2")
assert_equal 0, conn.llen("queue:queue_5")
assert_equal 1, conn.llen("queue:queue_6")
end
ensure
Sidekiq.client_middleware.remove Stopper
end
end

it 'should empty the retry and scheduled queues up to the current time' do
enqueued_time = Time.new(2013, 2, 4)

Time.stub(:now, enqueued_time) do
@retry.schedule (Time.now - 60).to_f, @error_1
@retry.schedule (Time.now - 50).to_f, @error_2
@retry.schedule (Time.now + 60).to_f, @error_3
@scheduled.schedule (Time.now - 60).to_f, @future_1
@scheduled.schedule (Time.now - 50).to_f, @future_2
@scheduled.schedule (Time.now + 60).to_f, @future_3

@poller.poll

Sidekiq.redis do |conn|
error_1 = { 'class' => ScheduledWorker.name, 'args' => ["error_1"], 'queue' => 'queue_1' }
error_2 = { 'class' => ScheduledWorker.name, 'args' => ["error_2"], 'queue' => 'queue_2' }
error_3 = { 'class' => ScheduledWorker.name, 'args' => ["error_3"], 'queue' => 'queue_3' }
future_1 = { 'class' => ScheduledWorker.name, 'args' => ["future_1"], 'queue' => 'queue_4' }
future_2 = { 'class' => ScheduledWorker.name, 'args' => ["future_2"], 'queue' => 'queue_5' }
future_3 = { 'class' => ScheduledWorker.name, 'args' => ["future_3"], 'queue' => 'queue_6' }

conn.zadd("retry", (Time.now - 60).to_f.to_s, Sidekiq.dump_json(error_1))
conn.zadd("retry", (Time.now - 50).to_f.to_s, Sidekiq.dump_json(error_2))
conn.zadd("retry", (Time.now + 60).to_f.to_s, Sidekiq.dump_json(error_3))
conn.zadd("schedule", (Time.now - 60).to_f.to_s, Sidekiq.dump_json(future_1))
conn.zadd("schedule", (Time.now - 50).to_f.to_s, Sidekiq.dump_json(future_2))
conn.zadd("schedule", (Time.now + 60).to_f.to_s, Sidekiq.dump_json(future_3))

poller = Sidekiq::Scheduled::Poller.new
poller.poll

assert_equal [Sidekiq.dump_json(error_1.merge('enqueued_at' => enqueued_time.to_f))], conn.lrange("queue:queue_1", 0, -1)
assert_equal [Sidekiq.dump_json(error_2.merge('enqueued_at' => enqueued_time.to_f))], conn.lrange("queue:queue_2", 0, -1)
assert_equal [Sidekiq.dump_json(future_1.merge('enqueued_at' => enqueued_time.to_f))], conn.lrange("queue:queue_4", 0, -1)
assert_equal [Sidekiq.dump_json(future_2.merge('enqueued_at' => enqueued_time.to_f))], conn.lrange("queue:queue_5", 0, -1)

assert_equal [Sidekiq.dump_json(error_3)], conn.zrange("retry", 0, -1)
assert_equal [Sidekiq.dump_json(future_3)], conn.zrange("schedule", 0, -1)
assert_equal 1, conn.llen("queue:queue_1")
assert_equal enqueued_time.to_f, Sidekiq.load_json(conn.lrange("queue:queue_1", 0, -1)[0])['enqueued_at']
assert_equal 1, conn.llen("queue:queue_2")
assert_equal enqueued_time.to_f, Sidekiq.load_json(conn.lrange("queue:queue_2", 0, -1)[0])['enqueued_at']
assert_equal 1, conn.llen("queue:queue_4")
assert_equal enqueued_time.to_f, Sidekiq.load_json(conn.lrange("queue:queue_4", 0, -1)[0])['enqueued_at']
assert_equal 1, conn.llen("queue:queue_5")
assert_equal enqueued_time.to_f, Sidekiq.load_json(conn.lrange("queue:queue_5", 0, -1)[0])['enqueued_at']
end

assert_equal 1, @retry.size
assert_equal 1, @scheduled.size
end
end
end
Expand Down

0 comments on commit 3dda03d

Please sign in to comment.