Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
updated qless core
  • Loading branch information
james-lawrence committed Jun 3, 2014
commit abe5b047f9324a07c29b326a3d35aa814e2651ee
119 changes: 66 additions & 53 deletions lib/qless/lua/qless-lib.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-- Current SHA: 3108245a22bf30415f9f3db85059d238ef35c4b0
-- Current SHA: 9a766e606d5115cb1a2a78f6e2b28be5cf5e121d
-- This is a generated file
-------------------------------------------------------------------------------
-- Forward declarations to make everything happy
Expand Down Expand Up @@ -74,12 +74,6 @@ end
function Qless.throttle(tid)
assert(tid, 'Throttle(): no tid provided')
local throttle = QlessThrottle.data({id = tid})
if not throttle then
throttle = {
id = tid,
maximum = 0
}
end
setmetatable(throttle, QlessThrottle)

-- set of jids which have acquired a lock on this throttle.
Expand Down Expand Up @@ -326,8 +320,7 @@ function Qless.tag(now, command, ...)
_tags[tag] = true
table.insert(tags, tag)
end
redis.call('zadd', 'ql:t:' .. tag, now, jid)
redis.call('zincrby', 'ql:tags', 1, tag)
Qless.job(jid):insert_tag(now, tag)
end

tags = cjson.encode(tags)
Expand All @@ -350,8 +343,7 @@ function Qless.tag(now, command, ...)
for i=2,#arg do
local tag = arg[i]
_tags[tag] = nil
redis.call('zrem', 'ql:t:' .. tag, jid)
redis.call('zincrby', 'ql:tags', -1, tag)
Qless.job(jid):remove_tag(tag)
end

local results = {}
Expand Down Expand Up @@ -436,7 +428,9 @@ function Qless.cancel(now, ...)
queue:remove_job(jid)
end

Qless.job(jid):throttles_release(now)
local job = Qless.job(jid)

job:throttles_release(now)

-- We should probably go through all our dependencies and remove
-- ourselves from the list of dependents
Expand All @@ -445,9 +439,6 @@ function Qless.cancel(now, ...)
redis.call('srem', QlessJob.ns .. j .. '-dependents', jid)
end

-- Delete any notion of dependencies it has
redis.call('del', QlessJob.ns .. jid .. '-dependencies')

-- If we're in the failed state, remove all of our data
if state == 'failed' then
failure = cjson.decode(failure)
Expand All @@ -465,22 +456,12 @@ function Qless.cancel(now, ...)
'ql:s:stats:' .. bin .. ':' .. queue, 'failed', failed - 1)
end

-- Remove it as a job that's tagged with this particular tag
local tags = cjson.decode(
redis.call('hget', QlessJob.ns .. jid, 'tags') or '{}')
for i, tag in ipairs(tags) do
redis.call('zrem', 'ql:t:' .. tag, jid)
redis.call('zincrby', 'ql:tags', -1, tag)
end
job:delete()

-- If the job was being tracked, we should notify
if redis.call('zscore', 'ql:tracked', jid) ~= false then
Qless.publish('canceled', jid)
end

-- Just go ahead and delete our data
redis.call('del', QlessJob.ns .. jid)
redis.call('del', QlessJob.ns .. jid .. '-history')
end
end

Expand Down Expand Up @@ -784,29 +765,16 @@ function QlessJob:complete(now, worker, queue, data, ...)
local jids = redis.call('zrangebyscore', 'ql:completed', 0, now - time)
-- Any jobs that need to be expired... delete
for index, jid in ipairs(jids) do
local tags = cjson.decode(
redis.call('hget', QlessJob.ns .. jid, 'tags') or '{}')
for i, tag in ipairs(tags) do
redis.call('zrem', 'ql:t:' .. tag, jid)
redis.call('zincrby', 'ql:tags', -1, tag)
end
redis.call('del', QlessJob.ns .. jid)
redis.call('del', QlessJob.ns .. jid .. '-history')
Qless.job(jid):delete()
end

-- And now remove those from the queued-for-cleanup queue
redis.call('zremrangebyscore', 'ql:completed', 0, now - time)

-- Now take the all by the most recent 'count' ids
jids = redis.call('zrange', 'ql:completed', 0, (-1-count))
for index, jid in ipairs(jids) do
local tags = cjson.decode(
redis.call('hget', QlessJob.ns .. jid, 'tags') or '{}')
for i, tag in ipairs(tags) do
redis.call('zrem', 'ql:t:' .. tag, jid)
redis.call('zincrby', 'ql:tags', -1, tag)
end
redis.call('del', QlessJob.ns .. jid)
redis.call('del', QlessJob.ns .. jid .. '-history')
Qless.job(jid):delete()
end
redis.call('zremrangebyrank', 'ql:completed', 0, (-1-count))

Expand Down Expand Up @@ -1388,6 +1356,50 @@ function QlessJob:throttles()

return self._throttles
end

-- Completely removes all the data
-- associated with this job, use
-- with care.
function QlessJob:delete()
local tags = redis.call('hget', QlessJob.ns .. self.jid, 'tags') or '[]'
tags = cjson.decode(tags)
-- remove the jid from each tag
for i, tag in ipairs(tags) do
self:remove_tag(tag)
end
-- Delete the job's data
redis.call('del', QlessJob.ns .. self.jid)
-- Delete the job's history
redis.call('del', QlessJob.ns .. self.jid .. '-history')
-- Delete any notion of dependencies it has
redis.call('del', QlessJob.ns .. self.jid .. '-dependencies')
end

-- Inserts the jid into the specified tag.
-- This should probably be moved to its own tag
-- object.
function QlessJob:insert_tag(now, tag)
redis.call('zadd', 'ql:t:' .. tag, now, self.jid)
redis.call('zincrby', 'ql:tags', 1, tag)
end

-- Removes the jid from the specified tag.
-- this should probably be moved to its own tag
-- object.
function QlessJob:remove_tag(tag)
-- Remove the job from the specified tag
redis.call('zrem', 'ql:t:' .. tag, self.jid)

-- Decrement the tag in the set of all tags.
local score = redis.call('zincrby', 'ql:tags', -1, tag)

-- if the score for the specified tag is 0
-- it means we have no jobs with this tag anymore
-- and we should remove it from the set to prevent memory leaks.
if tonumber(score) == 0 then
redis.call('zrem', 'ql:tags', tag)
end
end
-------------------------------------------------------------------------------
-- Queue class
-------------------------------------------------------------------------------
Expand Down Expand Up @@ -1940,8 +1952,7 @@ function QlessQueue:put(now, worker, jid, klass, raw_data, delay, ...)

-- Add this job to the list of jobs tagged with whatever tags were supplied
for i, tag in ipairs(tags) do
redis.call('zadd', 'ql:t:' .. tag, now, jid)
redis.call('zincrby', 'ql:tags', 1, tag)
Qless.job(jid):insert_tag(now, tag)
end

-- If we're in the failed state, remove all of our data
Expand Down Expand Up @@ -2210,8 +2221,7 @@ function QlessQueue:check_recurring(now, count)
-- Add this job to the list of jobs tagged with whatever tags were
-- supplied
for i, tag in ipairs(_tags) do
redis.call('zadd', 'ql:t:' .. tag, now, jid .. '-' .. count)
redis.call('zincrby', 'ql:tags', 1, tag)
Qless.job(jid .. '-' .. count):insert_tag(now, tag)
end

-- First, let's save its data
Expand Down Expand Up @@ -2651,16 +2661,19 @@ function QlessWorker.counts(now, worker)
end
-- Retrieve the data fro a throttled resource
function QlessThrottle:data()
-- Default values for the data
local data = {
id = self.id,
maximum = 0
}

-- Retrieve data stored in redis
local throttle = redis.call('hmget', QlessThrottle.ns .. self.id, 'id', 'maximum')
-- Return default if it doesn't exist
if not throttle[1] then
return {id = self.id, maximum = 0}

if throttle[2] then
data.maximum = tonumber(throttle[2])
end

local data = {
id = throttle[1],
maximum = tonumber(throttle[2])
}
return data
end

Expand Down
Loading