diff --git a/lib/qless/lua/qless-lib.lua b/lib/qless/lua/qless-lib.lua index 03799ad0..08f59dfe 100644 --- a/lib/qless/lua/qless-lib.lua +++ b/lib/qless/lua/qless-lib.lua @@ -1,4 +1,4 @@ --- Current SHA: 3108245a22bf30415f9f3db85059d238ef35c4b0 +-- Current SHA: 9a766e606d5115cb1a2a78f6e2b28be5cf5e121d -- This is a generated file ------------------------------------------------------------------------------- -- Forward declarations to make everything happy @@ -74,12 +74,6 @@ end function Qless.throttle(tid) assert(tid, 'Throttle(): no tid provided') local throttle = QlessThrottle.data({id = tid}) - if not throttle then - throttle = { - id = tid, - maximum = 0 - } - end setmetatable(throttle, QlessThrottle) -- set of jids which have acquired a lock on this throttle. @@ -326,8 +320,7 @@ function Qless.tag(now, command, ...) _tags[tag] = true table.insert(tags, tag) end - redis.call('zadd', 'ql:t:' .. tag, now, jid) - redis.call('zincrby', 'ql:tags', 1, tag) + Qless.job(jid):insert_tag(now, tag) end tags = cjson.encode(tags) @@ -350,8 +343,7 @@ function Qless.tag(now, command, ...) for i=2,#arg do local tag = arg[i] _tags[tag] = nil - redis.call('zrem', 'ql:t:' .. tag, jid) - redis.call('zincrby', 'ql:tags', -1, tag) + Qless.job(jid):remove_tag(tag) end local results = {} @@ -436,7 +428,9 @@ function Qless.cancel(now, ...) queue:remove_job(jid) end - Qless.job(jid):throttles_release(now) + local job = Qless.job(jid) + + job:throttles_release(now) -- We should probably go through all our dependencies and remove -- ourselves from the list of dependents @@ -445,9 +439,6 @@ function Qless.cancel(now, ...) redis.call('srem', QlessJob.ns .. j .. '-dependents', jid) end - -- Delete any notion of dependencies it has - redis.call('del', QlessJob.ns .. jid .. '-dependencies') - -- If we're in the failed state, remove all of our data if state == 'failed' then failure = cjson.decode(failure) @@ -465,22 +456,12 @@ function Qless.cancel(now, ...) 'ql:s:stats:' .. bin .. ':' .. queue, 'failed', failed - 1) end - -- Remove it as a job that's tagged with this particular tag - local tags = cjson.decode( - redis.call('hget', QlessJob.ns .. jid, 'tags') or '{}') - for i, tag in ipairs(tags) do - redis.call('zrem', 'ql:t:' .. tag, jid) - redis.call('zincrby', 'ql:tags', -1, tag) - end + job:delete() -- If the job was being tracked, we should notify if redis.call('zscore', 'ql:tracked', jid) ~= false then Qless.publish('canceled', jid) end - - -- Just go ahead and delete our data - redis.call('del', QlessJob.ns .. jid) - redis.call('del', QlessJob.ns .. jid .. '-history') end end @@ -784,29 +765,16 @@ function QlessJob:complete(now, worker, queue, data, ...) local jids = redis.call('zrangebyscore', 'ql:completed', 0, now - time) -- Any jobs that need to be expired... delete for index, jid in ipairs(jids) do - local tags = cjson.decode( - redis.call('hget', QlessJob.ns .. jid, 'tags') or '{}') - for i, tag in ipairs(tags) do - redis.call('zrem', 'ql:t:' .. tag, jid) - redis.call('zincrby', 'ql:tags', -1, tag) - end - redis.call('del', QlessJob.ns .. jid) - redis.call('del', QlessJob.ns .. jid .. '-history') + Qless.job(jid):delete() end + -- And now remove those from the queued-for-cleanup queue redis.call('zremrangebyscore', 'ql:completed', 0, now - time) -- Now take the all by the most recent 'count' ids jids = redis.call('zrange', 'ql:completed', 0, (-1-count)) for index, jid in ipairs(jids) do - local tags = cjson.decode( - redis.call('hget', QlessJob.ns .. jid, 'tags') or '{}') - for i, tag in ipairs(tags) do - redis.call('zrem', 'ql:t:' .. tag, jid) - redis.call('zincrby', 'ql:tags', -1, tag) - end - redis.call('del', QlessJob.ns .. jid) - redis.call('del', QlessJob.ns .. jid .. '-history') + Qless.job(jid):delete() end redis.call('zremrangebyrank', 'ql:completed', 0, (-1-count)) @@ -1388,6 +1356,50 @@ function QlessJob:throttles() return self._throttles end + +-- Completely removes all the data +-- associated with this job, use +-- with care. +function QlessJob:delete() + local tags = redis.call('hget', QlessJob.ns .. self.jid, 'tags') or '[]' + tags = cjson.decode(tags) + -- remove the jid from each tag + for i, tag in ipairs(tags) do + self:remove_tag(tag) + end + -- Delete the job's data + redis.call('del', QlessJob.ns .. self.jid) + -- Delete the job's history + redis.call('del', QlessJob.ns .. self.jid .. '-history') + -- Delete any notion of dependencies it has + redis.call('del', QlessJob.ns .. self.jid .. '-dependencies') +end + +-- Inserts the jid into the specified tag. +-- This should probably be moved to its own tag +-- object. +function QlessJob:insert_tag(now, tag) + redis.call('zadd', 'ql:t:' .. tag, now, self.jid) + redis.call('zincrby', 'ql:tags', 1, tag) +end + +-- Removes the jid from the specified tag. +-- this should probably be moved to its own tag +-- object. +function QlessJob:remove_tag(tag) + -- Remove the job from the specified tag + redis.call('zrem', 'ql:t:' .. tag, self.jid) + + -- Decrement the tag in the set of all tags. + local score = redis.call('zincrby', 'ql:tags', -1, tag) + + -- if the score for the specified tag is 0 + -- it means we have no jobs with this tag anymore + -- and we should remove it from the set to prevent memory leaks. + if tonumber(score) == 0 then + redis.call('zrem', 'ql:tags', tag) + end +end ------------------------------------------------------------------------------- -- Queue class ------------------------------------------------------------------------------- @@ -1940,8 +1952,7 @@ function QlessQueue:put(now, worker, jid, klass, raw_data, delay, ...) -- Add this job to the list of jobs tagged with whatever tags were supplied for i, tag in ipairs(tags) do - redis.call('zadd', 'ql:t:' .. tag, now, jid) - redis.call('zincrby', 'ql:tags', 1, tag) + Qless.job(jid):insert_tag(now, tag) end -- If we're in the failed state, remove all of our data @@ -2210,8 +2221,7 @@ function QlessQueue:check_recurring(now, count) -- Add this job to the list of jobs tagged with whatever tags were -- supplied for i, tag in ipairs(_tags) do - redis.call('zadd', 'ql:t:' .. tag, now, jid .. '-' .. count) - redis.call('zincrby', 'ql:tags', 1, tag) + Qless.job(jid .. '-' .. count):insert_tag(now, tag) end -- First, let's save its data @@ -2651,16 +2661,19 @@ function QlessWorker.counts(now, worker) end -- Retrieve the data fro a throttled resource function QlessThrottle:data() + -- Default values for the data + local data = { + id = self.id, + maximum = 0 + } + + -- Retrieve data stored in redis local throttle = redis.call('hmget', QlessThrottle.ns .. self.id, 'id', 'maximum') - -- Return default if it doesn't exist - if not throttle[1] then - return {id = self.id, maximum = 0} + + if throttle[2] then + data.maximum = tonumber(throttle[2]) end - local data = { - id = throttle[1], - maximum = tonumber(throttle[2]) - } return data end diff --git a/lib/qless/lua/qless.lua b/lib/qless/lua/qless.lua index 88c85042..9c00f207 100644 --- a/lib/qless/lua/qless.lua +++ b/lib/qless/lua/qless.lua @@ -1,4 +1,4 @@ --- Current SHA: 3108245a22bf30415f9f3db85059d238ef35c4b0 +-- Current SHA: 9a766e606d5115cb1a2a78f6e2b28be5cf5e121d -- This is a generated file local Qless = { ns = 'ql:' @@ -58,12 +58,6 @@ end function Qless.throttle(tid) assert(tid, 'Throttle(): no tid provided') local throttle = QlessThrottle.data({id = tid}) - if not throttle then - throttle = { - id = tid, - maximum = 0 - } - end setmetatable(throttle, QlessThrottle) throttle.locks = { @@ -213,8 +207,7 @@ function Qless.tag(now, command, ...) _tags[tag] = true table.insert(tags, tag) end - redis.call('zadd', 'ql:t:' .. tag, now, jid) - redis.call('zincrby', 'ql:tags', 1, tag) + Qless.job(jid):insert_tag(now, tag) end tags = cjson.encode(tags) @@ -234,8 +227,7 @@ function Qless.tag(now, command, ...) for i=2,#arg do local tag = arg[i] _tags[tag] = nil - redis.call('zrem', 'ql:t:' .. tag, jid) - redis.call('zincrby', 'ql:tags', -1, tag) + Qless.job(jid):remove_tag(tag) end local results = {} @@ -305,15 +297,15 @@ function Qless.cancel(now, ...) queue:remove_job(jid) end - Qless.job(jid):throttles_release(now) + local job = Qless.job(jid) + + job:throttles_release(now) for i, j in ipairs(redis.call( 'smembers', QlessJob.ns .. jid .. '-dependencies')) do redis.call('srem', QlessJob.ns .. j .. '-dependents', jid) end - redis.call('del', QlessJob.ns .. jid .. '-dependencies') - if state == 'failed' then failure = cjson.decode(failure) redis.call('lrem', 'ql:f:' .. failure.group, 0, jid) @@ -327,19 +319,11 @@ function Qless.cancel(now, ...) 'ql:s:stats:' .. bin .. ':' .. queue, 'failed', failed - 1) end - local tags = cjson.decode( - redis.call('hget', QlessJob.ns .. jid, 'tags') or '{}') - for i, tag in ipairs(tags) do - redis.call('zrem', 'ql:t:' .. tag, jid) - redis.call('zincrby', 'ql:tags', -1, tag) - end + job:delete() if redis.call('zscore', 'ql:tracked', jid) ~= false then Qless.publish('canceled', jid) end - - redis.call('del', QlessJob.ns .. jid) - redis.call('del', QlessJob.ns .. jid .. '-history') end end @@ -576,27 +560,14 @@ function QlessJob:complete(now, worker, queue, data, ...) local jids = redis.call('zrangebyscore', 'ql:completed', 0, now - time) for index, jid in ipairs(jids) do - local tags = cjson.decode( - redis.call('hget', QlessJob.ns .. jid, 'tags') or '{}') - for i, tag in ipairs(tags) do - redis.call('zrem', 'ql:t:' .. tag, jid) - redis.call('zincrby', 'ql:tags', -1, tag) - end - redis.call('del', QlessJob.ns .. jid) - redis.call('del', QlessJob.ns .. jid .. '-history') + Qless.job(jid):delete() end + redis.call('zremrangebyscore', 'ql:completed', 0, now - time) jids = redis.call('zrange', 'ql:completed', 0, (-1-count)) for index, jid in ipairs(jids) do - local tags = cjson.decode( - redis.call('hget', QlessJob.ns .. jid, 'tags') or '{}') - for i, tag in ipairs(tags) do - redis.call('zrem', 'ql:t:' .. tag, jid) - redis.call('zincrby', 'ql:tags', -1, tag) - end - redis.call('del', QlessJob.ns .. jid) - redis.call('del', QlessJob.ns .. jid .. '-history') + Qless.job(jid):delete() end redis.call('zremrangebyrank', 'ql:completed', 0, (-1-count)) @@ -1040,6 +1011,32 @@ function QlessJob:throttles() return self._throttles end + +function QlessJob:delete() + local tags = redis.call('hget', QlessJob.ns .. self.jid, 'tags') or '[]' + tags = cjson.decode(tags) + for i, tag in ipairs(tags) do + self:remove_tag(tag) + end + redis.call('del', QlessJob.ns .. self.jid) + redis.call('del', QlessJob.ns .. self.jid .. '-history') + redis.call('del', QlessJob.ns .. self.jid .. '-dependencies') +end + +function QlessJob:insert_tag(now, tag) + redis.call('zadd', 'ql:t:' .. tag, now, self.jid) + redis.call('zincrby', 'ql:tags', 1, tag) +end + +function QlessJob:remove_tag(tag) + redis.call('zrem', 'ql:t:' .. tag, self.jid) + + local score = redis.call('zincrby', 'ql:tags', -1, tag) + + if tonumber(score) == 0 then + redis.call('zrem', 'ql:tags', tag) + end +end function Qless.queue(name) assert(name, 'Queue(): no queue name provided') local queue = {} @@ -1456,8 +1453,7 @@ function QlessQueue:put(now, worker, jid, klass, raw_data, delay, ...) end for i, tag in ipairs(tags) do - redis.call('zadd', 'ql:t:' .. tag, now, jid) - redis.call('zincrby', 'ql:tags', 1, tag) + Qless.job(jid):insert_tag(now, tag) end if state == 'failed' then @@ -1669,8 +1665,7 @@ function QlessQueue:check_recurring(now, count) moved = moved + 1 for i, tag in ipairs(_tags) do - redis.call('zadd', 'ql:t:' .. tag, now, jid .. '-' .. count) - redis.call('zincrby', 'ql:tags', 1, tag) + Qless.job(jid .. '-' .. count):insert_tag(now, tag) end local child_jid = jid .. '-' .. count @@ -1985,15 +1980,17 @@ function QlessWorker.counts(now, worker) end end function QlessThrottle:data() + local data = { + id = self.id, + maximum = 0 + } + local throttle = redis.call('hmget', QlessThrottle.ns .. self.id, 'id', 'maximum') - if not throttle[1] then - return {id = self.id, maximum = 0} + + if throttle[2] then + data.maximum = tonumber(throttle[2]) end - local data = { - id = throttle[1], - maximum = tonumber(throttle[2]) - } return data end diff --git a/lib/qless/qless-core b/lib/qless/qless-core index f7ef7351..9a766e60 160000 --- a/lib/qless/qless-core +++ b/lib/qless/qless-core @@ -1 +1 @@ -Subproject commit f7ef735105ade320fef8f621bf264851f246924a +Subproject commit 9a766e606d5115cb1a2a78f6e2b28be5cf5e121d