diff --git a/Changes.md b/Changes.md index 32deddd5f..36dbd8b80 100644 --- a/Changes.md +++ b/Changes.md @@ -1,6 +1,8 @@ HEAD ----------- +- Scheduled and Retry jobs now use Sidekiq::Client to push + jobs onto the queue, so they use client middleware. [dimko, #948] - Record the timestamp when jobs are enqueued. Add Sidekiq::Job#enqueued\_at to query the time. [mariovisic, #944] - Add Sidekiq::Queue#latency - calculates diff between now and diff --git a/lib/sidekiq/client.rb b/lib/sidekiq/client.rb index 9870110cb..a5ac66863 100644 --- a/lib/sidekiq/client.rb +++ b/lib/sidekiq/client.rb @@ -94,7 +94,10 @@ def raw_push(payloads) pushed = false Sidekiq.redis do |conn| if payloads.first['at'] - pushed = conn.zadd('schedule', payloads.map {|hash| [hash['at'].to_s, Sidekiq.dump_json(hash)]}) + pushed = conn.zadd('schedule', payloads.map do |hash| + at = hash.delete('at').to_s + [at, Sidekiq.dump_json(hash)] + end) else q = payloads.first['queue'] to_push = payloads.map { |entry| Sidekiq.dump_json(entry) } diff --git a/lib/sidekiq/scheduled.rb b/lib/sidekiq/scheduled.rb index 53574f3b4..a5b824ea3 100644 --- a/lib/sidekiq/scheduled.rb +++ b/lib/sidekiq/scheduled.rb @@ -33,13 +33,12 @@ def poll(first_time=false) # going wrong between the time jobs are popped from the scheduled queue and when # they are pushed onto a work queue and losing the jobs. while message = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do - msg = Sidekiq.load_json(message).tap { |msg| msg['enqueued_at'] = Time.now.to_f } # Pop item off the queue and add it to the work queue. If the job can't be popped from # the queue, it's because another process already popped it so we can move on to the # next one. if conn.zrem(sorted_set, message) - Sidekiq::Client.push(msg) + Sidekiq::Client.push(Sidekiq.load_json(message)) logger.debug { "enqueued #{sorted_set}: #{message}" } end end diff --git a/test/test_web.rb b/test/test_web.rb index 57db55350..ab54bf3dc 100644 --- a/test/test_web.rb +++ b/test/test_web.rb @@ -264,7 +264,6 @@ def add_scheduled score = Time.now.to_f msg = { 'class' => 'HardWorker', 'args' => ['bob', 1, Time.now.to_f], - 'at' => score, 'jid' => 'f39af2a05e8f4b24dbc0f1e4' } Sidekiq.redis do |conn| conn.zadd('schedule', score, Sidekiq.dump_json(msg))