Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/specs/openapi.json

Large diffs are not rendered by default.

62 changes: 37 additions & 25 deletions lumigator/backend/backend/services/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@ class JobService:
"""list: A list of non-terminal job statuses."""

# TODO: rely on https://github.com/ray-project/ray/blob/7c2a200ef84f17418666dad43017a82f782596a3/python/ray/dashboard/modules/job/common.py#L53
TERMINAL_STATUS = [JobStatus.FAILED.value, JobStatus.SUCCEEDED.value, JobStatus.STOPPED.value]
TERMINAL_STATUS = [
JobStatus.FAILED.value,
JobStatus.SUCCEEDED.value,
JobStatus.STOPPED.value,
JobStatus.UNRECOVERABLE.value,
]
"""list: A list of terminal job statuses."""

SAFE_JOB_NAME_REGEX = re.compile(r"[^\w\-_.]")
Expand All @@ -110,15 +115,15 @@ def __init__(
secret_service: SecretService,
background_tasks: BackgroundTasks,
):
self.job_repo = job_repo
self.result_repo = result_repo
self.ray_client = ray_client
self._job_repo = job_repo
self._result_repo = result_repo
self._ray_client = ray_client
self._dataset_service = dataset_service
self._secret_service = secret_service
self._background_tasks = background_tasks

def _get_job_record_per_type(self, job_type: str) -> list[JobRecord]:
records = self.job_repo.get_by_job_type(job_type)
records = self._job_repo.get_by_job_type(job_type)
if records is None:
return []
return records
Expand All @@ -131,7 +136,7 @@ def _get_job_record(self, job_id: UUID) -> JobRecord:
:rtype: JobRecord
:raises JobNotFoundError: If the job does not exist
"""
record = self.job_repo.get(job_id)
record = self._job_repo.get(job_id)
if record is None:
raise JobNotFoundError(job_id) from None

Expand Down Expand Up @@ -182,7 +187,7 @@ def _update_job_record(self, job_id: UUID, **updates) -> JobRecord:
:rtype: JobRecord
:raises JobNotFoundError: If the job does not exist in the database
"""
record = self.job_repo.update(job_id, **updates)
record = self._job_repo.update(job_id, **updates)
if record is None:
raise JobNotFoundError(job_id) from None

Expand Down Expand Up @@ -312,18 +317,20 @@ def _validate_results(self, job_id: UUID, s3_file_system: S3FileSystem) -> JobRe
def get_upstream_job_status(self, job_id: UUID) -> str:
"""Returns the (lowercase) status of the upstream job.

Example: PENDING, RUNNING, STOPPED, SUCCEEDED, FAILED.
Example: pending, running, stopped, succeeded, failed.

:param job_id: The ID of the job to retrieve the status for.
:return: The status of the upstream job.
:rtype: str
:raises JobUpstreamError: If there is an error with the upstream service returning the
job status
:raises JobNotFoundError: If the job cannot be found in the upstream service.
:raises JobUpstreamError: If there is an error with the upstream service returning the job status
"""
try:
status_response = self.ray_client.get_job_status(str(job_id))
status_response = self._ray_client.get_job_status(str(job_id))
return str(status_response.value.lower())
except RuntimeError as e:
# See: https://github.com/ray-project/ray/blob/24ad12d81f8201859f2f00919929e00a750fa4d2/python/ray/dashboard/modules/dashboard_sdk.py#L282-L285
if "status code 404" in str(e):
raise JobNotFoundError(job_id, "Job not found in upstream Ray service") from e
raise JobUpstreamError("ray", "error getting Ray job status") from e

def get_job_logs(self, job_id: UUID) -> JobLogsResponse:
Expand All @@ -335,7 +342,7 @@ def get_job_logs(self, job_id: UUID) -> JobLogsResponse:
:raises JobUpstreamError: If there is an error with the upstream service returning the job logs,
and there are no logs currently persisted in Lumigator's storage.
"""
job = self.job_repo.get(job_id)
job = self._job_repo.get(job_id)
if not job:
raise JobNotFoundError(job_id) from None

Expand Down Expand Up @@ -456,7 +463,7 @@ def create_job(
# Create a db record for the job
# To find the experiment that a job belongs to,
# we'd use https://mlflow.org/docs/latest/python_api/mlflow.client.html#mlflow.client.MlflowClient.search_runs
record = self.job_repo.create(name=request.name, description=request.description, job_type=job_type)
record = self._job_repo.create(name=request.name, description=request.description, job_type=job_type)
job_result_storage_path = self._get_s3_uri(record.id)
dataset_s3_path = self._dataset_service.get_dataset_s3_path(request.dataset)
job_config = job_settings.generate_config(request, record.id, dataset_s3_path, job_result_storage_path)
Expand Down Expand Up @@ -506,7 +513,7 @@ def create_job(
num_gpus=settings.RAY_WORKER_GPUS,
)
loguru.logger.info(f"Submitting {job_type} Ray job...")
submit_ray_job(self.ray_client, entrypoint)
submit_ray_job(self._ray_client, entrypoint)

# NOTE: Only inference jobs can store results in a dataset atm. Among them:
# - prediction jobs are run in a workflow before evaluations => they trigger dataset saving
Expand All @@ -531,21 +538,26 @@ def get_job(self, job_id: UUID) -> JobResponse:
:param job_id: the ID of the job to retrieve
:return: the job record which includes information on whether a job belongs to an experiment
:rtype: JobRecord
:raises JobNotFoundError: If the job does not exist
:raises JobNotFoundError: If the job does not exist in Lumigator.
:raises JobUpstreamError: If there is an error with the upstream service returning the latest job status.
"""
record = self._get_job_record(job_id)
loguru.logger.info(f"Obtaining info for job {job_id}: {record.name}")
loguru.logger.info(f"Obtained info for job ID: {job_id} name: {record.name}")

# If the job is finished (successfully or not), return the record.
if record.status.value in self.TERMINAL_STATUS:
return JobResponse.model_validate(record)

# get job status from ray
job_status = self.ray_client.get_job_status(job_id)
loguru.logger.info(f"Obtaining info from ray for job {job_id}: {job_status}")
# Attempt to get the latest job status from the upstream service.
try:
job_status = self.get_upstream_job_status(job_id)
except JobNotFoundError as e:
job_status = JobStatus.UNRECOVERABLE.value.lower()
loguru.logger.error(f"Job ID: {job_id} cannot be found in Ray, Using status: {job_status}", f"error: {e}")

# update job status in the DB if it differs from the current status
if job_status.lower() != record.status.value.lower():
record = self._update_job_record(job_id, status=job_status.lower())
# Update job status in the DB if it differs from the current status
if job_status != record.status.value.lower():
record = self._update_job_record(job_id, status=job_status)

return JobResponse.model_validate(record)

Expand All @@ -558,7 +570,7 @@ def list_jobs(
# It would be better if we could just feed an empty dict,
# but this complicates things at the ORM level,
# see https://docs.sqlalchemy.org/en/20/core/sqlelement.html#sqlalchemy.sql.expression.or_
records = self.job_repo.list(
records = self._job_repo.list(
skip,
limit,
criteria=[or_(*[JobRecord.job_type == job_type for job_type in job_types])],
Expand Down Expand Up @@ -593,7 +605,7 @@ def get_job_result(self, job_id: UUID) -> JobResultResponse:
:rtype: JobResultResponse
:raises JobNotFoundError: if the job does not exist
"""
result_record = self.result_repo.get_by_job_id(job_id)
result_record = self._result_repo.get_by_job_id(job_id)
if result_record is None:
raise JobNotFoundError(job_id) from None

Expand Down
23 changes: 19 additions & 4 deletions lumigator/backend/backend/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import csv
import io
import json
import logging
import os
import sys
import time
import uuid
from collections.abc import Generator
Expand Down Expand Up @@ -31,6 +29,7 @@
)
from lumigator_schemas.models import ModelsResponse
from mlflow.entities import Metric, Param, Run, RunData, RunInfo, RunTag
from ray.dashboard.modules.job.sdk import JobSubmissionClient
from s3fs import S3FileSystem
from sqlalchemy import Engine, create_engine
from sqlalchemy.orm import Session
Expand Down Expand Up @@ -451,8 +450,18 @@ def job_record(db_session):


@pytest.fixture(scope="function")
def job_service(db_session, job_repository, result_repository, dataset_service, secret_service, background_tasks):
return JobService(job_repository, result_repository, None, dataset_service, secret_service, background_tasks)
def fake_ray_client() -> JobSubmissionClient:
"""Mocked Ray client for testing."""
return MagicMock(spec=JobSubmissionClient)


@pytest.fixture(scope="function")
def job_service(
db_session, job_repository, result_repository, fake_ray_client, dataset_service, secret_service, background_tasks
):
return JobService(
job_repository, result_repository, fake_ray_client, dataset_service, secret_service, background_tasks
)


@pytest.fixture(scope="function")
Expand Down Expand Up @@ -613,3 +622,9 @@ def fake_mlflow_run_deleted():
run_data = RunData(metrics={}, params={}, tags={})

return Run(run_info=run_info, run_data=run_data)


@pytest.fixture(scope="function")
def valid_job_id() -> UUID:
"""Fixture that returns a random UUID."""
return UUID("d34dbeef-4bea-4d19-ad06-214202165812")
37 changes: 34 additions & 3 deletions lumigator/backend/backend/tests/unit/services/test_job_service.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
import json
from unittest.mock import patch
from unittest.mock import MagicMock, patch
from uuid import UUID

import loguru
import pytest
import requests_mock
from lumigator_schemas.datasets import DatasetFormat
from lumigator_schemas.jobs import (
JobCreate,
JobInferenceConfig,
JobType,
)
from lumigator_schemas.secrets import SecretUploadRequest
from ray.dashboard.modules.job.common import JobStatus
from ray.job_submission import JobSubmissionClient

from backend.ray_submit.submission import RayJobEntrypoint
from backend.services.exceptions.job_exceptions import JobValidationError
from backend.services.exceptions.job_exceptions import JobNotFoundError, JobUpstreamError, JobValidationError
from backend.services.exceptions.secret_exceptions import SecretNotFoundError
from backend.services.jobs import job_settings_map
from backend.services.jobs import JobService, job_settings_map
from backend.settings import settings
from backend.tests.conftest import TEST_SEQ2SEQ_MODEL

Expand Down Expand Up @@ -152,3 +155,31 @@ def test_missing_api_key_in_job_creation(
):
with pytest.raises(JobValidationError):
job_service.create_job(request)


def test_get_upstream_job_status_success(job_service: JobService, valid_job_id: UUID):
# Mock the Ray client's get_job_status method to return a mock response
mock_job_status = MagicMock()
mock_job_status.value = "SUCCEEDED"
job_service._ray_client.get_job_status = MagicMock(return_value=mock_job_status)
status = job_service.get_upstream_job_status(valid_job_id)
assert status == "succeeded"


def test_get_upstream_job_status_job_not_found(job_service: JobService, valid_job_id: UUID):
"""Test for job not found error (404)."""
# This is the current response from the Ray client when a job is not found.
job_service._ray_client.get_job_status.side_effect = RuntimeError(
"Request failed with status code 404: Job not found."
)

with pytest.raises(JobNotFoundError):
job_service.get_upstream_job_status(valid_job_id)


def test_get_upstream_job_status_other_error(job_service: JobService, valid_job_id: UUID):
"""Test for any other error."""
job_service._ray_client.get_job_status.side_effect = RuntimeError("Some other error occurred")

with pytest.raises(JobUpstreamError):
job_service.get_upstream_job_status(valid_job_id)
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
<template #body="slotProps">
<div>
<Tag
v-if="retrieveStatus(slotProps.data.id) === WorkflowStatus.SUCCEEDED"
v-if="retrieveStatus(slotProps.data.id) === JobStatus.SUCCEEDED"
severity="success"
rounded
:value="retrieveStatus(slotProps.data.id)"
:pt="{ root: 'l-job-table__tag' }"
/>
<Tag
v-else-if="retrieveStatus(slotProps.data.id) === WorkflowStatus.FAILED"
v-else-if="retrieveStatus(slotProps.data.id) === JobStatus.FAILED"
severity="danger"
rounded
:value="retrieveStatus(slotProps.data.id)"
Expand Down Expand Up @@ -70,7 +70,7 @@ import { storeToRefs } from 'pinia'
import { useDatasetStore } from '@/stores/datasetsStore'
import { useSlidePanel } from '@/composables/useSlidePanel'
import { formatDate } from '@/helpers/formatDate'
import { WorkflowStatus } from '@/types/Workflow'
import { JobStatus } from '@/types/Job'

const datasetStore = useDatasetStore()
const { inferenceJobs, hasRunningInferenceJob } = storeToRefs(datasetStore)
Expand Down
14 changes: 6 additions & 8 deletions lumigator/frontend/src/components/datasets/LJobDetails.vue
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
size="small"
label="Logs"
aria-label="Logs"
:disabled="currentItemStatus === WorkflowStatus.PENDING"
:disabled="currentItemStatus === JobStatus.PENDING"
style="padding: 0; background: transparent; border: none; font-weight: 400; gap: 4px"
class="l-experiment-details__content-item-logs"
iconClass="logs-btn"
Expand Down Expand Up @@ -109,7 +109,7 @@
size="small"
icon="pi pi-external-link"
label="View Results"
:disabled="currentItemStatus !== WorkflowStatus.SUCCEEDED"
:disabled="currentItemStatus !== JobStatus.SUCCEEDED"
@click="showResults"
></Button>
<Button
Expand All @@ -119,7 +119,7 @@
size="small"
icon="pi pi-download"
label="Download Results"
:disabled="currentItemStatus !== WorkflowStatus.SUCCEEDED"
:disabled="currentItemStatus !== JobStatus.SUCCEEDED"
@click="emit('l-download-results', selectedJob)"
></Button>
</div>
Expand All @@ -133,7 +133,7 @@ import { storeToRefs } from 'pinia'
import Button from 'primevue/button'
import Tag from 'primevue/tag'
import { formatDate } from '@/helpers/formatDate'
import { WorkflowStatus } from '@/types/Workflow'
import { JobStatus } from '@/types/Job'
import { useDatasetStore } from '@/stores/datasetsStore'
import type { Job } from '@/types/Job'

Expand Down Expand Up @@ -177,12 +177,10 @@ const isInference = computed(() => {
const tagSeverity = computed(() => {
const status = currentItemStatus.value
switch (status) {
case WorkflowStatus.SUCCEEDED:
case JobStatus.SUCCEEDED:
return 'success'
case WorkflowStatus.FAILED:
case JobStatus.FAILED:
return 'danger'
case WorkflowStatus.INCOMPLETE:
return 'info'
default:
return 'warn'
}
Expand Down
4 changes: 2 additions & 2 deletions lumigator/frontend/src/components/views/LDatasets.vue
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ import type { Dataset } from '@/types/Dataset'
import LJobDetails from '../datasets/LJobDetails.vue'
import type { Job } from '@/types/Job'
import { jobsService } from '@/sdk/jobsService'
import { WorkflowStatus } from '@/types/Workflow'
import { JobStatus } from '@/types/Job'
import { datasetsService } from '@/sdk/datasetsService'
import { getAxiosError } from '@/helpers/getAxiosError'
import type { AxiosError } from 'axios'
Expand Down Expand Up @@ -210,7 +210,7 @@ watch(
if (newValue) {
retrieveJobLogs()
}
if (newValue?.status === WorkflowStatus.RUNNING) {
if (newValue?.status === JobStatus.RUNNING) {
startPollingForAnnotationJobLogs()
} else if (isPollingForJobLogs.value) {
stopPollingForAnnotationJobLogs()
Expand Down
7 changes: 3 additions & 4 deletions lumigator/frontend/src/stores/datasetsStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ import { datasetsService } from '@/sdk/datasetsService'

import type { Dataset } from '@/types/Dataset'
import { jobsService } from '@/sdk/jobsService'
import type { Job } from '@/types/Job'
import { WorkflowStatus } from '@/types/Workflow'
import { type Job, JobStatus } from '@/types/Job'
import { calculateDuration } from '@/helpers/calculateDuration'

export const useDatasetStore = defineStore('datasets', () => {
const datasets: Ref<Dataset[]> = ref([])
const selectedDataset: Ref<Dataset | undefined> = ref()

const completedStatus = [WorkflowStatus.SUCCEEDED, WorkflowStatus.FAILED]
const completedStatus = [JobStatus.SUCCEEDED, JobStatus.FAILED]

const jobs: Ref<Job[]> = ref([])
const inferenceJobs: Ref<Job[]> = ref([])
Expand Down Expand Up @@ -48,7 +47,7 @@ export const useDatasetStore = defineStore('datasets', () => {
}

const hasRunningInferenceJob = computed(() => {
return inferenceJobs.value.some((job) => job.status === WorkflowStatus.RUNNING)
return inferenceJobs.value.some((job) => job.status === JobStatus.RUNNING)
})

/**
Expand Down
Loading
Loading