Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
5aa07d7
feat: create a /jobs api to return queue and history jobs
ric-yu Dec 2, 2025
aa00641
update unused vars
ric-yu Dec 2, 2025
b2bd48e
include priority
ric-yu Dec 2, 2025
b874f46
create jobs helper file
ric-yu Dec 3, 2025
380b6ae
fix ruff
ric-yu Dec 3, 2025
048c413
update how we set error message
ric-yu Dec 3, 2025
2e0b26b
include execution error in both responses
ric-yu Dec 4, 2025
e4c7136
rename error -> failed, fix output shape
ric-yu Dec 4, 2025
90fb5cc
re-use queue and history functions
ric-yu Dec 4, 2025
8a2bb7c
set workflow id
ric-yu Dec 4, 2025
b7c7712
allow srot by exec duration
ric-yu Dec 4, 2025
a38aacf
fix tests
ric-yu Dec 4, 2025
1b67306
send priority and remove error msg
ric-yu Dec 4, 2025
5274b91
use ws messages to get start and end times
ric-yu Dec 4, 2025
c860cc6
revert main.py fully
ric-yu Dec 4, 2025
5ed90a1
refactor: move all /jobs business logic to jobs.py
ric-yu Dec 4, 2025
b14ae80
fix failing test
ric-yu Dec 4, 2025
a10cb30
remove some tests
ric-yu Dec 4, 2025
86590ca
fix non dict nodes
ric-yu Dec 8, 2025
460b848
address comments
ric-yu Dec 8, 2025
1f7c1a9
filter by workflow id and remove null fields
ric-yu Dec 9, 2025
ed61899
add clearer typing - remove get("..") or ..
ric-yu Dec 12, 2025
c8a1d2e
refactor query params to top get_job(s) doc, add remove_sensitive_fro…
ric-yu Dec 12, 2025
7f4fb73
Merge branch 'master' into feature/unified-jobs-api
Kosinkadink Dec 16, 2025
03a7f1c
add brief comment explaining why we skip animated
ric-yu Dec 17, 2025
724fbf8
comment that format field is for frontend backward compatibility
ric-yu Dec 17, 2025
786703a
fix whitespace
ric-yu Dec 18, 2025
be341c9
Merge branch 'master' into feature/unified-jobs-api
guill Dec 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor query params to top get_job(s) doc, add remove_sensitive_fro…
…m_queue
  • Loading branch information
ric-yu committed Dec 12, 2025
commit c8a1d2ea0ddabe94cec3814ce7f07bb2f2bcaf15
14 changes: 10 additions & 4 deletions comfy_execution/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,11 @@ def is_previewable(media_type: str, item: dict) -> bool:


def normalize_queue_item(item: tuple, status: str) -> dict:
"""Convert queue item tuple to unified job dict."""
priority, prompt_id, _, extra_data, _ = item[:5]
"""Convert queue item tuple to unified job dict.

Expects item with sensitive data already removed (5 elements).
"""
priority, prompt_id, _, extra_data, _ = item
create_time, workflow_id = _extract_job_metadata(extra_data)

return prune_dict({
Expand All @@ -79,9 +82,12 @@ def normalize_queue_item(item: tuple, status: str) -> dict:


def normalize_history_item(prompt_id: str, history_item: dict, include_outputs: bool = False) -> dict:
"""Convert history item dict to unified job dict."""
"""Convert history item dict to unified job dict.

History items have sensitive data already removed (prompt tuple has 5 elements).
"""
prompt_tuple = history_item['prompt']
priority, _, prompt, extra_data, _ = prompt_tuple[:5]
priority, _, prompt, extra_data, _ = prompt_tuple
create_time, workflow_id = _extract_job_metadata(extra_data)

status_info = history_item.get('status', {})
Expand Down
43 changes: 32 additions & 11 deletions server.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@
if args.enable_manager:
import comfyui_manager


def _remove_sensitive_from_queue(queue: list) -> list:
"""Remove sensitive data (index 5) from queue item tuples."""
return [item[:5] for item in queue]


async def send_socket_catch_exception(function, message):
try:
await function(message)
Expand Down Expand Up @@ -697,36 +703,48 @@ async def get_object_info_node(request):

@routes.get("/api/jobs")
async def get_jobs(request):
"""List all jobs with filtering, sorting, and pagination."""
"""List all jobs with filtering, sorting, and pagination.

Query parameters:
status: Filter by status (comma-separated): pending, in_progress, completed, failed
workflow_id: Filter by workflow ID
sort_by: Sort field: created_at (default), execution_duration
sort_order: Sort direction: asc, desc (default)
limit: Max items to return (positive integer)
offset: Items to skip (non-negative integer, default 0)
"""
query = request.rel_url.query

status_param = query.get("status", None)
status_param = query.get('status')
workflow_id = query.get('workflow_id')
sort_by = query.get('sort_by', 'created_at').lower()
sort_order = query.get('sort_order', 'desc').lower()

status_filter = None
if status_param:
status_filter = [s.strip().lower() for s in status_param.split(',') if s.strip()]
valid_statuses = set(JobStatus.ALL)
invalid_statuses = [s for s in status_filter if s not in valid_statuses]
invalid_statuses = [s for s in status_filter if s not in JobStatus.ALL]
if invalid_statuses:
return web.json_response(
{"error": f"Invalid status value(s): {', '.join(invalid_statuses)}. Valid values: {', '.join(JobStatus.ALL)}"},
status=400
)

sort_by = query.get('sort_by', 'created_at').lower()
if sort_by not in {'created_at', 'execution_duration'}:
return web.json_response(
{"error": "sort_by must be 'created_at' or 'execution_duration'"},
status=400
)

sort_order = query.get('sort_order', 'desc').lower()
if sort_order not in {'asc', 'desc'}:
return web.json_response(
{"error": "sort_order must be 'asc' or 'desc'"},
status=400
)

limit = None

# If limit is provided, validate that it is a positive integer, else continue without a limit
if 'limit' in query:
try:
limit = int(query.get('limit'))
Expand All @@ -753,11 +771,12 @@ async def get_jobs(request):
status=400
)

workflow_id = query.get('workflow_id', None)

running, queued = self.prompt_queue.get_current_queue_volatile()
history = self.prompt_queue.get_history()

running = _remove_sensitive_from_queue(running)
queued = _remove_sensitive_from_queue(queued)

jobs, total = get_all_jobs(
running, queued, history,
status_filter=status_filter,
Expand Down Expand Up @@ -793,6 +812,9 @@ async def get_job_by_id(request):
running, queued = self.prompt_queue.get_current_queue_volatile()
history = self.prompt_queue.get_history(prompt_id=job_id)

running = _remove_sensitive_from_queue(running)
queued = _remove_sensitive_from_queue(queued)

job = get_job(job_id, running, queued, history)
if job is None:
return web.json_response(
Expand Down Expand Up @@ -825,9 +847,8 @@ async def get_history_prompt_id(request):
async def get_queue(request):
queue_info = {}
current_queue = self.prompt_queue.get_current_queue_volatile()
remove_sensitive = lambda queue: [x[:5] for x in queue]
queue_info['queue_running'] = remove_sensitive(current_queue[0])
queue_info['queue_pending'] = remove_sensitive(current_queue[1])
queue_info['queue_running'] = _remove_sensitive_from_queue(current_queue[0])
queue_info['queue_pending'] = _remove_sensitive_from_queue(current_queue[1])
return web.json_response(queue_info)

@routes.post("/prompt")
Expand Down
Loading