diff --git a/lib/qless/queue.rb b/lib/qless/queue.rb index ebc730b3..b0998b68 100644 --- a/lib/qless/queue.rb +++ b/lib/qless/queue.rb @@ -17,6 +17,10 @@ def running(start = 0, count = 25) @client.call('jobs', 'running', @name, start, count) end + def throttled(start = 0, count = 25) + @client.call('jobs', 'throttled', @name, start, count) + end + def stalled(start = 0, count = 25) @client.call('jobs', 'stalled', @name, start, count) end diff --git a/lib/qless/server.rb b/lib/qless/server.rb index 7894e3cc..38af8ba9 100755 --- a/lib/qless/server.rb +++ b/lib/qless/server.rb @@ -173,7 +173,7 @@ def strftime(t) json(client.queues[params[:name]].counts) end - filtered_tabs = %w[ running scheduled stalled depends recurring ].to_set + filtered_tabs = %w[ running throttled scheduled stalled depends recurring ].to_set get '/queues/:name/?:tab?' do queue = client.queues[params[:name]] tab = params.fetch('tab', 'stats') diff --git a/lib/qless/server/views/queue.erb b/lib/qless/server/views/queue.erb index 14e5b2f3..9f2e5f9b 100644 --- a/lib/qless/server/views/queue.erb +++ b/lib/qless/server/views/queue.erb @@ -43,6 +43,7 @@
  • ">Stats
  • ">Running
  • ">Waiting
  • +
  • ">Throttled
  • ">Scheduled
  • ">Stalled
  • ">Depends
  • @@ -57,9 +58,10 @@

    "><%= queue['name'] %> | <%= queue['running'] %> / <%= queue['waiting'] %> / + <%= queue['throttled'] %> / <%= queue['scheduled'] %> / <%= queue['stalled'] %> / - <%= queue['depends'] %> (running / waiting / scheduled / stalled / depends) + <%= queue['depends'] %> (running / waiting / throttled / scheduled / stalled / depends)

    @@ -74,7 +76,7 @@ -<% if ['running', 'waiting', 'scheduled', 'stalled', 'depends', 'recurring'].include?(tab) %> +<% if ['running', 'waiting', 'throttled', 'scheduled', 'stalled', 'depends', 'recurring'].include?(tab) %>
    <%= erb :_job_list, :locals => { :jobs => jobs, :queues => queues } %> <% else %> diff --git a/lib/qless/server/views/queues.erb b/lib/qless/server/views/queues.erb index 8509d849..fd3bb33a 100644 --- a/lib/qless/server/views/queues.erb +++ b/lib/qless/server/views/queues.erb @@ -33,9 +33,11 @@

    | <%= queue['running'] %> / <%= queue['waiting'] %> / + <%= queue['throttled'] %> / <%= queue['scheduled'] %> / <%= queue['stalled'] %> / - <%= queue['depends'] %> (running / waiting / scheduled / stalled / depends) + <%= queue['depends'] %> / + <%= queue['recurring'] %> (running / waiting / throttled / scheduled / stalled / depends / recurring)

    diff --git a/lib/qless/server/views/track.erb b/lib/qless/server/views/track.erb index 7a9d4439..617f341c 100644 --- a/lib/qless/server/views/track.erb +++ b/lib/qless/server/views/track.erb @@ -11,6 +11,7 @@ var fade = function(jid, type) {
  • All (<%= tracked['jobs'].length %>)
  • Running (<%= tracked['jobs'].select { |job| job.state == 'running' }.length %>)
  • Waiting (<%= tracked['jobs'].select { |job| job.state == 'waiting' }.length %>)
  • +
  • Throttled (<%= tracked['jobs'].select { |job| job.state == 'throttled'}.length %>)
  • Scheduled (<%= tracked['jobs'].select { |job| job.state == 'scheduled' }.length %>)
  • Stalled (<%= tracked['jobs'].select { |job| job.state == 'stalled' }.length %>)
  • Completed (<%= tracked['jobs'].select { |job| job.state == 'complete' }.length %>)
  • @@ -41,6 +42,11 @@ var fade = function(jid, type) { <%= erb :_job, :layout => false, :locals => { :job => job, :queues => queues } %> <% end %> +
    + <% tracked['jobs'].select { |job| job.state == 'throttled' }.each do |job| %> + <%= erb :_job, :layout => false, :locals => { :job => job, :queues => queues } %> + <% end %> +
    <% tracked['jobs'].select { |job| job.state == 'scheduled' }.each do |job| %> <%= erb :_job, :layout => false, :locals => { :job => job, :queues => queues } %> diff --git a/spec/integration/server_spec.rb b/spec/integration/server_spec.rb index 7caed7a5..2d7ee8dd 100644 --- a/spec/integration/server_spec.rb +++ b/spec/integration/server_spec.rb @@ -134,23 +134,34 @@ def test_pagination(page_1_jid = 1, page_2_jid = 27) q.put(Qless::Job, {}) visit '/' first('.queue-row', text: /testing/).should be - first('.queue-row', text: /0\D+1\D+0\D+0\D+0/).should be + first('.queue-row', text: /0\D+1\D+0\D+0\D+0\D+0\D+0/).should be first('h1', text: /no queues/i).should be_nil first('h1', text: /queues and their job counts/i).should be # Let's pop the job, and make sure that we can see /that/ job = q.pop visit '/' - first('.queue-row', text: /1\D+0\D+0\D+0\D+0/).should be + first('.queue-row', text: /1\D+0\D+0\D+0\D+0\D+0\D+0/).should be first('.worker-row', text: q.worker_name).should be first('.worker-row', text: /1\D+0/i).should be # Let's complete the job, and make sure it disappears job.complete visit '/' - first('.queue-row', text: /0\D+0\D+0\D+0\D+0/).should be + first('.queue-row', text: /0\D+0\D+0\D+0\D+0\D+0\D+0/).should be first('.worker-row', text: /0\D+0/i).should be + # Let's throttle a job, and make sure we see it + client.throttles['one'].maximum = 1 + q.put(Qless::Job, {}, :throttles => ["one"]) + q.put(Qless::Job, {}, :throttles => ["one"]) + job1 = q.pop + job2 = q.pop + visit '/' + first('.queue-row', text: /1\D+0\D+1\D+0\D+0\D+0\D+0/).should be + job1.complete + q.pop.complete + # Let's put and pop and fail a job, and make sure we see it q.put(Qless::Job, {}) job = q.pop @@ -162,11 +173,11 @@ def test_pagination(page_1_jid = 1, page_2_jid = 27) # And let's have one scheduled, and make sure it shows up accordingly jid = q.put(Qless::Job, {}, delay: 60) visit '/' - first('.queue-row', text: /0\D+0\D+1\D+0\D+0/).should be + first('.queue-row', text: /0\D+0\D+0\D+1\D+0\D+0\D+0/).should be # And one that depends on that job q.put(Qless::Job, {}, depends: [jid]) visit '/' - first('.queue-row', text: /0\D+0\D+1\D+0\D+1/).should be + first('.queue-row', text: /0\D+0\D+0\D+1\D+0\D+1\D+0/).should be end it 'can visit the tracked page' do @@ -193,6 +204,17 @@ def test_pagination(page_1_jid = 1, page_2_jid = 27) first('a', text: /completed\W+1/i).should be job.untrack + # And now for a throttled job + client.throttles['one'].maximum = 1 + q.put(Qless::Job, {}, throttles: ["one"]) + job = client.jobs[q.put(Qless::Job, {}, throttles: ["one"])] + job.track + q.pop(2) + visit '/track' + first('a', text: /all\W+1/i).should be + first('a', text: /throttled\W+1/i).should be + job.untrack + # And now for a scheduled job job = client.jobs[q.put(Qless::Job, {}, delay: 600)] job.track @@ -634,6 +656,52 @@ def test_pagination(page_1_jid = 1, page_2_jid = 27) groups.map { |g| g.text }.join(' ').should eq('e j i h g f d c b a') end + it 'can visit /queues' do + # We should be able to see all of the appropriate tabs, + # We should be able to see all of the jobs + jid = q.put(Qless::Job, {}) + + # We should see this job + visit '/queues' + first('h3', text: /0\D+1\D+0\D+0\D+0\D+0\D+0/).should be + + # Now let's pop off the job so that it's running + job = q.pop + visit '/queues' + first('h3', text: /1\D+0\D+0\D+0\D+0\D+0\D+0/).should be + job.complete + + # And now for a throttled job + client.throttles['one'].maximum = 1 + q.put(Qless::Job, {}, throttles: ["one"]) + q.put(Qless::Job, {}, throttles: ["one"]) + job1, job2 = q.pop(2) + visit '/queues' + first('h3', text: /1\D+0\D+1\D+0\D+0\D+0\D+0/).should be + job1.complete + q.pop.complete + + # And now for a scheduled job + job = client.jobs[q.put(Qless::Job, {}, delay: 600)] + visit '/queues' + first('h3', text: /0\D+0\D+0\D+1\D+0\D+0\D+0/).should be + job.cancel + + # And now a dependent job + job1 = client.jobs[q.put(Qless::Job, {})] + job2 = client.jobs[q.put(Qless::Job, {}, depends: [job1.jid])] + visit '/queues' + first('h3', text: /0\D+1\D+0\D+0\D+0\D+1\D+0/).should be + job2.cancel + job1.cancel + + # And now a recurring job + job = client.jobs[q.recur(Qless::Job, {}, 5)] + visit '/queues' + first('h3', text: /0\D+0\D+0\D+0\D+0\D+0\D+1/).should be + job.cancel + end + it 'can visit the various /queues/* endpoints' do # We should be able to see all of the appropriate tabs, # We should be able to see all of the jobs @@ -648,6 +716,16 @@ def test_pagination(page_1_jid = 1, page_2_jid = 27) first('h2', text: /#{jid[0...8]}/).should be job.complete + # And now for a throttled job + client.throttles['one'].maximum = 1 + job1 = client.jobs[q.put(Qless::Job, {}, throttles: ["one"])] + job2 = client.jobs[q.put(Qless::Job, {}, throttles: ["one"])] + q.pop(2) + visit '/queues/testing/throttled' + first('h2', text: /#{job2.jid[0...8]}/).should be + job1.cancel + job2.cancel + # And now for a scheduled job job = client.jobs[q.put(Qless::Job, {}, delay: 600)] visit '/queues/testing/scheduled' @@ -690,6 +768,16 @@ def test_pagination(page_1_jid = 1, page_2_jid = 27) job.untrack first('.tracked-row', text: /complete/i).should be + # And now for a throttled job + client.throttles['one'].maximum = 1 + job1 = client.jobs[q.put(Qless::Job, {}, throttles: ["one"])] + job2 = client.jobs[q.put(Qless::Job, {}, throttles: ["one"])] + job2.track + q.pop(2) + visit '/' + first('.tracked-row', text: /throttled/i).should be + job2.untrack + # And now for a scheduled job job = client.jobs[q.put(Qless::Job, {}, delay: 600)] job.track