Skip to content

Commit

Permalink
Remove 'at' property so it doesn't confuse Client
Browse files Browse the repository at this point in the history
  • Loading branch information
mperham committed May 25, 2013
1 parent 3dda03d commit c2f09b7
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 4 deletions.
2 changes: 2 additions & 0 deletions Changes.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
HEAD
-----------

- Scheduled and Retry jobs now use Sidekiq::Client to push
jobs onto the queue, so they use client middleware. [dimko, #948]
- Record the timestamp when jobs are enqueued. Add
Sidekiq::Job#enqueued\_at to query the time. [mariovisic, #944]
- Add Sidekiq::Queue#latency - calculates diff between now and
Expand Down
5 changes: 4 additions & 1 deletion lib/sidekiq/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ def raw_push(payloads)
pushed = false
Sidekiq.redis do |conn|
if payloads.first['at']
pushed = conn.zadd('schedule', payloads.map {|hash| [hash['at'].to_s, Sidekiq.dump_json(hash)]})
pushed = conn.zadd('schedule', payloads.map do |hash|
at = hash.delete('at').to_s
[at, Sidekiq.dump_json(hash)]
end)
else
q = payloads.first['queue']
to_push = payloads.map { |entry| Sidekiq.dump_json(entry) }
Expand Down
3 changes: 1 addition & 2 deletions lib/sidekiq/scheduled.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,12 @@ def poll(first_time=false)
# going wrong between the time jobs are popped from the scheduled queue and when
# they are pushed onto a work queue and losing the jobs.
while message = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do
msg = Sidekiq.load_json(message).tap { |msg| msg['enqueued_at'] = Time.now.to_f }

# Pop item off the queue and add it to the work queue. If the job can't be popped from
# the queue, it's because another process already popped it so we can move on to the
# next one.
if conn.zrem(sorted_set, message)
Sidekiq::Client.push(msg)
Sidekiq::Client.push(Sidekiq.load_json(message))
logger.debug { "enqueued #{sorted_set}: #{message}" }
end
end
Expand Down
1 change: 0 additions & 1 deletion test/test_web.rb
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ def add_scheduled
score = Time.now.to_f
msg = { 'class' => 'HardWorker',
'args' => ['bob', 1, Time.now.to_f],
'at' => score,
'jid' => 'f39af2a05e8f4b24dbc0f1e4' }
Sidekiq.redis do |conn|
conn.zadd('schedule', score, Sidekiq.dump_json(msg))
Expand Down

0 comments on commit c2f09b7

Please sign in to comment.