Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add the exclude_future_jobs arg to the queue_depth command
The queue_depth command can call get_queue_depths with the option to exclude jobs in the future
  • Loading branch information
George9Waller committed Feb 21, 2025
commit 738be6db1603c96429023f48d29022c154757481
5 changes: 4 additions & 1 deletion django_dbq/management/commands/queue_depth.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ class Command(BaseCommand):

def add_arguments(self, parser):
parser.add_argument("queue_name", nargs="*", default=["default"], type=str)
parser.add_argument("--exclude_future_jobs", default=False, type=bool)

def handle(self, *args, **options):
queue_names = options["queue_name"]
queue_depths = Job.get_queue_depths()
queue_depths = Job.get_queue_depths(
exclude_future_jobs=options["exclude_future_jobs"]
)

queue_depths_string = " ".join(
[
Expand Down
30 changes: 28 additions & 2 deletions django_dbq/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,19 @@ def test_get_queue_depths_exclude_future_jobs(self):
self.assertDictEqual(queue_depths, {"default": 1, "testworker": 1})


@freezegun.freeze_time("2025-01-01T12:00:00Z")
@override_settings(JOBS={"testjob": {"tasks": ["a"]}})
class QueueDepthTestCase(TestCase):
def test_queue_depth(self):

Job.objects.create(name="testjob", state=Job.STATES.FAILED)
Job.objects.create(name="testjob", state=Job.STATES.NEW)
Job.objects.create(name="testjob", state=Job.STATES.FAILED)
Job.objects.create(name="testjob", state=Job.STATES.COMPLETE)
Job.objects.create(name="testjob", state=Job.STATES.READY)
Job.objects.create(
name="testjob",
state=Job.STATES.READY,
run_after=timezone.make_aware(datetime(2025, 1, 1, 13, 0, 0)),
)
Job.objects.create(
name="testjob", queue_name="testqueue", state=Job.STATES.READY
)
Expand All @@ -132,6 +136,28 @@ def test_queue_depth(self):
output = stdout.getvalue()
self.assertEqual(output.strip(), "event=queue_depths default=2")

def test_queue_depth_exclude_future_jobs(self):
Job.objects.create(name="testjob", state=Job.STATES.FAILED)
Job.objects.create(name="testjob", state=Job.STATES.NEW)
Job.objects.create(name="testjob", state=Job.STATES.FAILED)
Job.objects.create(name="testjob", state=Job.STATES.COMPLETE)
Job.objects.create(
name="testjob",
state=Job.STATES.READY,
run_after=timezone.make_aware(datetime(2025, 1, 1, 13, 0, 0)),
)
Job.objects.create(
name="testjob", queue_name="testqueue", state=Job.STATES.READY
)
Job.objects.create(
name="testjob", queue_name="testqueue", state=Job.STATES.READY
)

stdout = StringIO()
call_command("queue_depth", exclude_future_jobs=True, stdout=stdout)
output = stdout.getvalue()
self.assertEqual(output.strip(), "event=queue_depths default=1")

def test_queue_depth_multiple_queues(self):

Job.objects.create(name="testjob", state=Job.STATES.FAILED)
Expand Down