Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Merge branch 'main' into peteski22/wait-for/testing
  • Loading branch information
peteski22 committed Apr 2, 2025
commit d1d111fdfdc4dfd08ca1a545e8bdd1b6984f9827
2 changes: 1 addition & 1 deletion docs/source/specs/openapi.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions lumigator/backend/backend/services/jobs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import csv
import json
import re
import time
from http import HTTPStatus
from io import BytesIO, StringIO
Expand Down
18 changes: 13 additions & 5 deletions lumigator/backend/backend/services/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,18 +171,23 @@ async def _run_inference_eval_pipeline(
try:
# Wait for the inference job to 'complete'.
status = await self._job_service.wait_for_job_complete(
inference_job.id, timeout_seconds=request.job_timeout_sec
job_id=inference_job.id, timeout_seconds=request.job_timeout_sec
)

if status != JobStatus.SUCCEEDED:
# Trigger the failure handling logic
raise JobUpstreamError(f"Inference job {inference_job.id} failed with status {status}") from None
except TimeoutError as e:

# Mark the job as successful in the tracking client.
await self._tracking_client.update_workflow_status(inference_run_id, WorkflowStatus.SUCCEEDED)
except TimeoutError as timeout_exc:
loguru.logger.error(
"Workflow pipeline error: Workflow {}. Inference job: {} failed: {}", workflow.id, inference_job.id, e
f"Workflow pipeline error: "
f"Workflow {workflow.id}. "
f"Inference job: {inference_job.id} failed: "
f"{timeout_exc}"
)
await self._handle_workflow_failure(workflow.id)
return
return await self._handle_workflow_failure(workflow.id)

try:
# Figure out the dataset filename
Expand Down Expand Up @@ -298,6 +303,9 @@ async def _run_inference_eval_pipeline(

# TODO: Handle other error types that can be raised by the method.
self._job_service._validate_results(evaluation_job.id, self._dataset_service.s3_filesystem)

# Mark the job as successful in the tracking client.
await self._tracking_client.update_workflow_status(eval_run_id, WorkflowStatus.SUCCEEDED)
except (TimeoutError, ValidationError) as e:
loguru.logger.error(
"Workflow pipeline error: Workflow {}. Evaluation job: {} failed: {}", workflow.id, evaluation_job.id, e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,49 +442,73 @@ def test_job_non_existing(local_client: TestClient, dependency_overrides_service
assert response.json()["detail"] == f"Job with ID {non_existing_id} not found"


def wait_for_workflow_complete(
local_client: TestClient,
workflow_id: UUID,
timeout_seconds: int = 300,
initial_poll_interval_seconds: float = 1.0,
max_poll_interval_seconds: float = 10.0,
backoff_factor: float = 1.5,
):
start_time = time.time()
workflow_status = WorkflowStatus.CREATED
status_retrieved = False
poll_interval = initial_poll_interval_seconds
def wait_for_workflow_complete(local_client: TestClient, workflow_id: UUID) -> WorkflowDetailsResponse | None:
"""Wait for the workflow to complete, including post-completion processing for successful
workflows to create compiled results.

Makes a total of ``MAX_POLLS`` (as configured in the ``conftest.py``).
Sleeps for ``POLL_WAIT_TIME`` seconds between each poll (as configured in the ``conftest.py``).

logger.info(f"Waiting for workflow {workflow_id} to complete (timeout {timeout_seconds} seconds)...")
:param local_client: The test client.
:param workflow_id: The workflow ID of the workflow to wait for.
:return: The workflow details, or ``None`` if the workflow did not reach the required completed state
within the maximum number of polls.
"""
attempt = 0
max_attempts = MAX_POLLS
wait_duration = POLL_WAIT_TIME

while attempt < max_attempts:
# Allow the waiting interval if we're coming around again.
if attempt > 0:
time.sleep(wait_duration)

while time.time() - start_time < timeout_seconds:
attempt += 1
try:
response = local_client.get(f"/workflows/{workflow_id}")
response.raise_for_status()

workflow_details = WorkflowDetailsResponse.model_validate(response.json())
workflow_status = workflow_details.status
status_retrieved = True

if workflow_status in {WorkflowStatus.SUCCEEDED, WorkflowStatus.FAILED}:
logger.info(f"Workflow {workflow_id} completed with status: {workflow_status}")
return workflow_details

logger.info(f"Workflow {workflow_id}, current status: {workflow_status}")

# Validation failure will raise an exception (``ValidationError``) which is fine
# as if we're getting a response we expect it to be valid.
workflow = WorkflowDetailsResponse.model_validate(response.json())
except (RequestError, HTTPStatusError) as e:
logger.error(f"Workflow {workflow_id}, request failed (HTTP): {e}")
except ValidationError as e:
logger.error(f"Workflow {workflow_id}, response parse error: {e}")

time.sleep(poll_interval)
poll_interval = min(poll_interval * backoff_factor, max_poll_interval_seconds)

raise TimeoutError(
f"Workflow {workflow_id} did not complete within {timeout_seconds} seconds.(last status: {workflow_status})"
if status_retrieved
else "(status never retrieved)"
)
# Log the error but allow us to retry the request until we've maxed out our attempts.
logger.warning(f"Workflow: {workflow_id}, request: ({attempt}/{max_attempts}) failed: {e}")
continue

# Check if the workflow is not in a terminal state.
if workflow.status not in {WorkflowStatus.SUCCEEDED, WorkflowStatus.FAILED}:
logger.info(
f"Workflow: {workflow_id}, "
f"request: ({attempt}/{max_attempts}), "
f"status: {workflow.status}, "
f"not in terminal state"
)
continue

# If the workflow failed, we can stop checking.
if workflow.status == WorkflowStatus.FAILED:
return workflow

# The workflow was successful, but we need the artifacts download url to be populated.
if not workflow.artifacts_download_url:
logger.info(
f"Workflow: {workflow_id}, "
f"request: ({attempt}/{max_attempts}), "
f"status: {workflow.status}, "
f"artifacts not ready"
)
continue

logger.info(
f"Workflow: {workflow_id},"
f"request: ({attempt}/{max_attempts}), "
f"status: {workflow.status}, "
f"succeeded and processed"
)
return workflow

# Couldn't get the workflow details within the maximum number of polls.
return None


def _test_launch_job_with_secret(
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.