Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 6 additions & 11 deletions lib/qless/worker/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -195,24 +195,19 @@ def on_current_job_lock_lost(&block)
@on_current_job_lock_lost = block
end

def listen_for_lost_lock
def listen_for_lost_lock(job)
# Ensure subscribers always has a value
subscribers = []
subscribers = uniq_clients.map do |client|
Subscriber.start(client, "ql:w:#{client.worker_name}", log: @log) do |_, message|
if message['event'] == 'lock_lost'
with_current_job do |job|
if job && message['jid'] == job.jid
@on_current_job_lock_lost.call(job)
end
end
subscriber = Subscriber.start(job.client, "ql:w:#{job.client.worker_name}", log: @log) do |_, message|
if message['event'] == 'lock_lost'
if message['jid'] == job.jid
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could combine ifs here

@on_current_job_lock_lost.call(job)
end
end
end

yield
ensure
subscribers.each(&:stop)
subscriber && subscriber.stop
end

private
Expand Down
24 changes: 12 additions & 12 deletions lib/qless/worker/serial.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@ def run

reserver.prep_for_work!

listen_for_lost_lock do
procline "Running #{reserver.description}"
procline "Running #{reserver.description}"

jobs.each do |job|
# Run the job we're working on
log(:debug, "Starting job #{job.klass_name} (#{job.jid} from #{job.queue_name})")
procline "Processing #{job.description}"
jobs.each do |job|
# Run the job we're working on
log(:debug, "Starting job #{job.klass_name} (#{job.jid} from #{job.queue_name})")
procline "Processing #{job.description}"
listen_for_lost_lock(job) do
perform(job)
log(:debug, "Finished job #{job.klass_name} (#{job.jid} from #{job.queue_name})")
end
log(:debug, "Finished job #{job.klass_name} (#{job.jid} from #{job.queue_name})")

# So long as we're paused, we should wait
while paused
log(:debug, 'Paused...')
sleep interval
end
# So long as we're paused, we should wait
while paused
log(:debug, 'Paused...')
sleep interval
end
end
end
Expand Down
4 changes: 3 additions & 1 deletion spec/integration/workers/serial_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,11 @@ def self.perform(job)
callback_invoked = false
worker.on_current_job_lock_lost { callback_invoked = true }

queue.put('JobClass', {})
queue.put('JobClass', {})

worker.listen_for_lost_lock do
job = queue.pop
worker.listen_for_lost_lock(job) do
queue.pop.timeout
end

Expand Down