Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 56 additions & 30 deletions lumigator/backend/backend/services/jobs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import csv
import json
import re
from http import HTTPStatus
from io import BytesIO, StringIO
from pathlib import Path
Expand Down Expand Up @@ -31,10 +32,12 @@
JobResultObject,
JobResultResponse,
JobStatus,
JobType,
)
from ray.job_submission import JobSubmissionClient
from s3fs import S3FileSystem
from sqlalchemy.sql.expression import or_
from starlette.datastructures import Headers

from backend.ray_submit.submission import RayJobEntrypoint, submit_ray_job
from backend.records.jobs import JobRecord
Expand Down Expand Up @@ -79,8 +82,7 @@


class JobService:
# set storage path
storage_path = f"s3://{Path(settings.S3_BUCKET) / settings.S3_JOB_RESULTS_PREFIX}/"
"""Job service is responsible for managing jobs in Lumigator."""

NON_TERMINAL_STATUS = [
JobStatus.CREATED.value,
Expand All @@ -93,6 +95,12 @@ class JobService:
TERMINAL_STATUS = [JobStatus.FAILED.value, JobStatus.SUCCEEDED.value, JobStatus.STOPPED.value]
"""list: A list of terminal job statuses."""

SAFE_JOB_NAME_REGEX = re.compile(r"[^\w\-_.]")
"""A regex pattern to match unsafe characters in job names."""

JOB_NAME_REPLACEMENT_CHAR = "-"
"""The character to replace unsafe characters in job names."""

def __init__(
self,
job_repo: JobRepository,
Expand Down Expand Up @@ -186,20 +194,34 @@ def _get_results_s3_key(self, job_id: UUID) -> str:
The S3 key is constructed from:
- settings.S3_JOB_RESULTS_PREFIX: the path where jobs are stored
- settings.S3_JOB_RESULTS_FILENAME: a filename template that is to be formatted with some of
the job record's metadata (e.g. exp name/id)
the job record's metadata (e.g. name/id)

NOTE: The job's name is sanitized to be S3-safe.

:param job_id: The ID of the job to retrieve the S3 key for
:return: The returned string contains the S3 key *excluding the bucket / s3 prefix*,
as it is to be used by the boto3 client which accepts them separately.
:rtype: str
:return: The returned string contains the S3 key *excluding the bucket and s3 prefix*.
:raises JobNotFoundError: If the job does not exist.
"""
record = self._get_job_record(job_id)
if record.name is None:
raise JobValidationError(f"Job {job_id} is missing 'name'") from None

return str(
Path(settings.S3_JOB_RESULTS_PREFIX)
/ settings.S3_JOB_RESULTS_FILENAME.format(job_name=record.name, job_id=record.id)
/ settings.S3_JOB_RESULTS_FILENAME.format(
job_name=self.sanitize_job_name(str(record.name)), job_id=record.id
)
)

def _get_s3_uri(self, job_id: UUID) -> str:
"""Construct a full S3 URI for job result artifacts.

:param job_id: The ID of the job to retrieve the S3 URI for.
:return: The S3 URI for the job results.
:raises JobNotFoundError: If the job does not exist.
"""
return f"s3://{settings.S3_BUCKET}/{self._get_results_s3_key(job_id)}"

def _results_to_binary_file(self, results: dict[str, Any], fields: list[str]) -> BytesIO:
"""Given a JSON string containing inference results and the fields
we want to read from it, generate a binary file (as a BytesIO
Expand All @@ -225,20 +247,19 @@ def _add_dataset_to_db(
s3_file_system: S3FileSystem,
dataset_filename: str,
is_gt_generated: bool = True,
):
) -> UUID:
"""Attempts to add the result of a job (generated dataset) as a new dataset in Lumigator.

:param job_id: The ID of the job, used to identify the S3 path
:param request: The job request containing the dataset and output fields
:param s3_file_system: The S3 filesystem dependency for accessing storage
:return: The ID of the dataset that was created
:raises DatasetNotFoundError: If the dataset in the request does not exist
:raises DatasetSizeError: if the dataset is too large
:raises DatasetInvalidError: if the dataset is invalid
:raises DatasetMissingFieldsError: if the dataset is missing any of the required fields
:raises DatasetUpstreamError: if there is an exception interacting with S3
"""
loguru.logger.info("Adding a new dataset entry to the database...")

# Get the dataset from the S3 bucket
results = self._validate_results(job_id, s3_file_system)

Expand All @@ -261,7 +282,7 @@ def _add_dataset_to_db(
file=bin_data,
size=bin_data_size,
filename=dataset_filename,
headers={"content-type": "text/csv"},
headers=Headers({"content-type": "text/csv"}),
)
dataset_record = self._dataset_service.upload_dataset(
upload_file,
Expand All @@ -272,24 +293,21 @@ def _add_dataset_to_db(
)

loguru.logger.info(f"Dataset '{dataset_filename}' with ID '{dataset_record.id}' added to the database.")
return dataset_record.id

def _validate_results(self, job_id: UUID, s3_file_system: S3FileSystem) -> JobResultObject:
"""Handles the evaluation result for a given job.
"""Retrieves a job's results from S3 and validates they conform to the ``JobResultObject`` schema.

Args:
job_id (UUID): The unique identifier of the job.
s3_file_system (S3FileSystem): The S3 file system object used to interact with the S3 bucket.

Note:
Currently, this function only validates the evaluation result. Future implementations
may include storing the results in a database (e.g., mlflow).
Raises: ``ValidationError`` if the results do not conform to the schema.
"""
loguru.logger.info("Handling evaluation result")

result_key = self._get_results_s3_key(job_id)
# TODO: use a path creation function.
with s3_file_system.open(f"{settings.S3_BUCKET}/{result_key}", "r") as f:
return JobResultObject.model_validate(json.loads(f.read()))
result_path = self._get_s3_uri(job_id)
with s3_file_system.open(result_path, "r") as f:
data = f.read()
return JobResultObject.model_validate_json(data)

def get_upstream_job_status(self, job_id: UUID) -> str:
"""Returns the (lowercase) status of the upstream job.
Expand Down Expand Up @@ -387,16 +405,19 @@ async def handle_annotation_job(self, job_id: UUID, request: JobCreate, max_wait
get added to the dataset db. The job routes that store the results
in the db will add this function as a background task after the job is created.
"""
loguru.logger.info("Handling inference job result")

# Figure out the dataset filename
dataset_filename = self._dataset_service.get_dataset(dataset_id=request.dataset).filename
dataset_filename = Path(dataset_filename).stem
dataset_filename = f"{dataset_filename}-annotated.csv"

job_status = await self.wait_for_job_complete(job_id, max_wait_time_sec)
if job_status == JobStatus.SUCCEEDED.value:
self._add_dataset_to_db(job_id, request, self._dataset_service.s3_filesystem, dataset_filename)
self._add_dataset_to_db(
job_id=job_id,
request=request,
s3_file_system=self._dataset_service.s3_filesystem,
dataset_filename=dataset_filename,
)
else:
loguru.logger.warning(f"Job {job_id} failed, results not stored in DB")

Expand Down Expand Up @@ -436,9 +457,9 @@ def create_job(
# To find the experiment that a job belongs to,
# we'd use https://mlflow.org/docs/latest/python_api/mlflow.client.html#mlflow.client.MlflowClient.search_runs
record = self.job_repo.create(name=request.name, description=request.description, job_type=job_type)

job_result_storage_path = self._get_s3_uri(record.id)
dataset_s3_path = self._dataset_service.get_dataset_s3_path(request.dataset)
job_config = job_settings.generate_config(request, record.id, dataset_s3_path, self.storage_path)
job_config = job_settings.generate_config(request, record.id, dataset_s3_path, job_result_storage_path)

# Build runtime ENV for workers
runtime_env_vars = settings.with_ray_worker_env_vars({"MZAI_JOB_ID": str(record.id)})
Expand All @@ -457,8 +478,6 @@ def create_job(
# command parameters provided via command line to the ray job.
# To do this, we use a dict where keys are parameter names as they'd
# appear on the command line and the values are the respective params.

# ...and use directly Job*Config(request.job.config.model_dump_json())
job_config_args = {
"--config": job_config.model_dump_json(),
}
Expand Down Expand Up @@ -495,7 +514,7 @@ def create_job(
# - annotation jobs do not run in workflows => they trigger dataset saving here at job level
# As JobType.ANNOTATION is not used uniformly throughout our code yet, we rely on the already
# existing `store_to_dataset` parameter to explicitly trigger this in the annotation case
if getattr(request.job_config, "store_to_dataset", False):
if job_type != JobType.EVALUATION and getattr(request.job_config, "store_to_dataset", False):
self.add_background_task(
self._background_tasks,
self.handle_annotation_job,
Expand All @@ -504,7 +523,6 @@ def create_job(
DEFAULT_POST_INFER_JOB_TIMEOUT_SEC,
)

loguru.logger.info("Getting response...")
return JobResponse.model_validate(record)

def get_job(self, job_id: UUID) -> JobResponse:
Expand Down Expand Up @@ -595,3 +613,11 @@ async def get_job_result_download(self, job_id: UUID) -> JobResultDownloadRespon
ExpiresIn=settings.S3_URL_EXPIRATION,
)
return JobResultDownloadResponse(id=job_id, download_url=download_url)

def sanitize_job_name(self, job_name: str) -> str:
"""Sanitize a job name to be S3-safe.

:param job_name: The job name to sanitize.
:return: The sanitized job name.
"""
return re.sub(self.SAFE_JOB_NAME_REGEX, self.JOB_NAME_REPLACEMENT_CHAR, job_name)
57 changes: 23 additions & 34 deletions lumigator/backend/backend/services/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,7 @@ async def _run_inference_eval_pipeline(

try:
# Attempt to submit the inference job to Ray before we track it in Lumigator.
inference_job = self._job_service.create_job(
job_infer_create,
)
inference_job = self._job_service.create_job(job_infer_create)
except (JobTypeUnsupportedError, SecretNotFoundError) as e:
loguru.logger.error("Workflow pipeline error: Workflow {}. Cannot create inference job: {}", workflow.id, e)
await self._handle_workflow_failure(workflow.id)
Expand Down Expand Up @@ -208,12 +206,12 @@ async def _run_inference_eval_pipeline(
try:
# Inference jobs produce a new dataset
# Add the dataset to the (local) database
self._job_service._add_dataset_to_db(
inference_job.id,
job_infer_create,
self._dataset_service.s3_filesystem,
dataset_filename,
request_dataset.generated,
inference_dataset_id = self._job_service._add_dataset_to_db(
job_id=inference_job.id,
request=job_infer_create,
s3_file_system=self._dataset_service.s3_filesystem,
dataset_filename=dataset_filename,
is_gt_generated=request_dataset.generated,
)
except (
DatasetNotFoundError,
Expand All @@ -233,19 +231,14 @@ async def _run_inference_eval_pipeline(

try:
# log the job to the tracking client
# TODO: Review how JobService._get_job_record works and if it can be re-used/made public.
result_key = str(
Path(settings.S3_JOB_RESULTS_PREFIX)
/ settings.S3_JOB_RESULTS_FILENAME.format(job_name=job_infer_create.name, job_id=inference_job.id)
)
# TODO: Review how JobService._get_s3_uri works and if it can be re-used/made public.
inference_results_path = self._job_service._get_s3_uri(inference_job.id)

# TODO: Review how DatasetService._get_s3_path (and similar) works and if it can be re-used/made public.
dataset_path = self._dataset_service._get_s3_path(result_key)
with self._dataset_service.s3_filesystem.open(dataset_path, "r") as f:
with self._dataset_service.s3_filesystem.open(inference_results_path, "r") as f:
inf_output = JobResultObject.model_validate_json(f.read())
inf_path = f"{settings.S3_BUCKET}/{self._job_service._get_results_s3_key(inference_job.id)}"

inference_job_output = RunOutputs(
parameters={"inference_output_s3_path": inf_path},
parameters={"inference_output_s3_path": inference_results_path},
metrics=inf_output.metrics,
ray_job_id=str(inference_job.id),
)
Expand All @@ -261,8 +254,6 @@ async def _run_inference_eval_pipeline(
return

# FIXME The ray status is now _not enough_ to set the job status,
# use the inference job id to recover the dataset record
dataset_record = self._dataset_service._get_dataset_record_by_job_id(inference_job.id)

job_config = JobEvalConfig()
if request.metrics:
Expand All @@ -275,16 +266,14 @@ async def _run_inference_eval_pipeline(
# prepare the inputs for the evaluation job and pass the id of the new dataset
job_eval_create = JobCreate(
name=f"{request.name}-evaluation",
dataset=dataset_record.id,
dataset=inference_dataset_id,
max_samples=experiment.max_samples,
job_config=job_config,
)

try:
# Attempt to submit the evaluation job before we track it in Lumigator.
evaluation_job = self._job_service.create_job(
job_eval_create,
)
evaluation_job = self._job_service.create_job(job_eval_create)
except (JobTypeUnsupportedError, SecretNotFoundError) as e:
loguru.logger.error(
"Workflow pipeline error: Workflow {}. Cannot create evaluation job: {}", workflow.id, e
Expand All @@ -294,7 +283,10 @@ async def _run_inference_eval_pipeline(

# Track the evaluation job.
eval_run_id = await self._tracking_client.create_job(
request.experiment_id, workflow.id, "evaluation", evaluation_job.id
experiment_id=request.experiment_id,
workflow_id=workflow.id,
name="evaluation",
job_id=evaluation_job.id,
)

try:
Expand Down Expand Up @@ -326,25 +318,22 @@ async def _run_inference_eval_pipeline(
evaluation_job,
)

result_key = str(
Path(settings.S3_JOB_RESULTS_PREFIX)
/ settings.S3_JOB_RESULTS_FILENAME.format(job_name=job_eval_create.name, job_id=evaluation_job.id)
)
with self._dataset_service.s3_filesystem.open(f"{settings.S3_BUCKET}/{result_key}", "r") as f:
# Get the path to the job results stored in S3.
evaluation_result_path = self._job_service._get_s3_uri(evaluation_job.id)

with self._dataset_service.s3_filesystem.open(evaluation_result_path, "r") as f:
eval_output = JobResultObject.model_validate_json(f.read())

# TODO this generic interface should probably be the output type of the eval job but
# we'll make that improvement later
# Get the dataset from the S3 bucket
result_key = self._job_service._get_results_s3_key(evaluation_job.id)

formatted_metrics = self._prepare_metrics(eval_output)

outputs = RunOutputs(
metrics=formatted_metrics,
# eventually this could be an artifact and be stored by the tracking client,
# but we'll keep it as being stored the way it is for right now.
parameters={"eval_output_s3_path": f"{settings.S3_BUCKET}/{result_key}"},
parameters={"eval_output_s3_path": evaluation_result_path},
ray_job_id=str(evaluation_job.id),
)
await self._tracking_client.update_job(eval_run_id, outputs)
Expand Down
Loading