diff --git a/lumigator/backend/backend/services/jobs.py b/lumigator/backend/backend/services/jobs.py index 25dbc065b..64f5e4ed5 100644 --- a/lumigator/backend/backend/services/jobs.py +++ b/lumigator/backend/backend/services/jobs.py @@ -1,6 +1,7 @@ import asyncio import csv import json +import re from http import HTTPStatus from io import BytesIO, StringIO from pathlib import Path @@ -31,10 +32,12 @@ JobResultObject, JobResultResponse, JobStatus, + JobType, ) from ray.job_submission import JobSubmissionClient from s3fs import S3FileSystem from sqlalchemy.sql.expression import or_ +from starlette.datastructures import Headers from backend.ray_submit.submission import RayJobEntrypoint, submit_ray_job from backend.records.jobs import JobRecord @@ -79,8 +82,7 @@ class JobService: - # set storage path - storage_path = f"s3://{Path(settings.S3_BUCKET) / settings.S3_JOB_RESULTS_PREFIX}/" + """Job service is responsible for managing jobs in Lumigator.""" NON_TERMINAL_STATUS = [ JobStatus.CREATED.value, @@ -93,6 +95,12 @@ class JobService: TERMINAL_STATUS = [JobStatus.FAILED.value, JobStatus.SUCCEEDED.value, JobStatus.STOPPED.value] """list: A list of terminal job statuses.""" + SAFE_JOB_NAME_REGEX = re.compile(r"[^\w\-_.]") + """A regex pattern to match unsafe characters in job names.""" + + JOB_NAME_REPLACEMENT_CHAR = "-" + """The character to replace unsafe characters in job names.""" + def __init__( self, job_repo: JobRepository, @@ -186,20 +194,34 @@ def _get_results_s3_key(self, job_id: UUID) -> str: The S3 key is constructed from: - settings.S3_JOB_RESULTS_PREFIX: the path where jobs are stored - settings.S3_JOB_RESULTS_FILENAME: a filename template that is to be formatted with some of - the job record's metadata (e.g. exp name/id) + the job record's metadata (e.g. name/id) + + NOTE: The job's name is sanitized to be S3-safe. :param job_id: The ID of the job to retrieve the S3 key for - :return: The returned string contains the S3 key *excluding the bucket / s3 prefix*, - as it is to be used by the boto3 client which accepts them separately. - :rtype: str + :return: The returned string contains the S3 key *excluding the bucket and s3 prefix*. + :raises JobNotFoundError: If the job does not exist. """ record = self._get_job_record(job_id) + if record.name is None: + raise JobValidationError(f"Job {job_id} is missing 'name'") from None return str( Path(settings.S3_JOB_RESULTS_PREFIX) - / settings.S3_JOB_RESULTS_FILENAME.format(job_name=record.name, job_id=record.id) + / settings.S3_JOB_RESULTS_FILENAME.format( + job_name=self.sanitize_job_name(str(record.name)), job_id=record.id + ) ) + def _get_s3_uri(self, job_id: UUID) -> str: + """Construct a full S3 URI for job result artifacts. + + :param job_id: The ID of the job to retrieve the S3 URI for. + :return: The S3 URI for the job results. + :raises JobNotFoundError: If the job does not exist. + """ + return f"s3://{settings.S3_BUCKET}/{self._get_results_s3_key(job_id)}" + def _results_to_binary_file(self, results: dict[str, Any], fields: list[str]) -> BytesIO: """Given a JSON string containing inference results and the fields we want to read from it, generate a binary file (as a BytesIO @@ -225,20 +247,19 @@ def _add_dataset_to_db( s3_file_system: S3FileSystem, dataset_filename: str, is_gt_generated: bool = True, - ): + ) -> UUID: """Attempts to add the result of a job (generated dataset) as a new dataset in Lumigator. :param job_id: The ID of the job, used to identify the S3 path :param request: The job request containing the dataset and output fields :param s3_file_system: The S3 filesystem dependency for accessing storage + :return: The ID of the dataset that was created :raises DatasetNotFoundError: If the dataset in the request does not exist :raises DatasetSizeError: if the dataset is too large :raises DatasetInvalidError: if the dataset is invalid :raises DatasetMissingFieldsError: if the dataset is missing any of the required fields :raises DatasetUpstreamError: if there is an exception interacting with S3 """ - loguru.logger.info("Adding a new dataset entry to the database...") - # Get the dataset from the S3 bucket results = self._validate_results(job_id, s3_file_system) @@ -261,7 +282,7 @@ def _add_dataset_to_db( file=bin_data, size=bin_data_size, filename=dataset_filename, - headers={"content-type": "text/csv"}, + headers=Headers({"content-type": "text/csv"}), ) dataset_record = self._dataset_service.upload_dataset( upload_file, @@ -272,24 +293,21 @@ def _add_dataset_to_db( ) loguru.logger.info(f"Dataset '{dataset_filename}' with ID '{dataset_record.id}' added to the database.") + return dataset_record.id def _validate_results(self, job_id: UUID, s3_file_system: S3FileSystem) -> JobResultObject: - """Handles the evaluation result for a given job. + """Retrieves a job's results from S3 and validates they conform to the ``JobResultObject`` schema. Args: job_id (UUID): The unique identifier of the job. s3_file_system (S3FileSystem): The S3 file system object used to interact with the S3 bucket. - Note: - Currently, this function only validates the evaluation result. Future implementations - may include storing the results in a database (e.g., mlflow). + Raises: ``ValidationError`` if the results do not conform to the schema. """ - loguru.logger.info("Handling evaluation result") - - result_key = self._get_results_s3_key(job_id) - # TODO: use a path creation function. - with s3_file_system.open(f"{settings.S3_BUCKET}/{result_key}", "r") as f: - return JobResultObject.model_validate(json.loads(f.read())) + result_path = self._get_s3_uri(job_id) + with s3_file_system.open(result_path, "r") as f: + data = f.read() + return JobResultObject.model_validate_json(data) def get_upstream_job_status(self, job_id: UUID) -> str: """Returns the (lowercase) status of the upstream job. @@ -387,8 +405,6 @@ async def handle_annotation_job(self, job_id: UUID, request: JobCreate, max_wait get added to the dataset db. The job routes that store the results in the db will add this function as a background task after the job is created. """ - loguru.logger.info("Handling inference job result") - # Figure out the dataset filename dataset_filename = self._dataset_service.get_dataset(dataset_id=request.dataset).filename dataset_filename = Path(dataset_filename).stem @@ -396,7 +412,12 @@ async def handle_annotation_job(self, job_id: UUID, request: JobCreate, max_wait job_status = await self.wait_for_job_complete(job_id, max_wait_time_sec) if job_status == JobStatus.SUCCEEDED.value: - self._add_dataset_to_db(job_id, request, self._dataset_service.s3_filesystem, dataset_filename) + self._add_dataset_to_db( + job_id=job_id, + request=request, + s3_file_system=self._dataset_service.s3_filesystem, + dataset_filename=dataset_filename, + ) else: loguru.logger.warning(f"Job {job_id} failed, results not stored in DB") @@ -436,9 +457,9 @@ def create_job( # To find the experiment that a job belongs to, # we'd use https://mlflow.org/docs/latest/python_api/mlflow.client.html#mlflow.client.MlflowClient.search_runs record = self.job_repo.create(name=request.name, description=request.description, job_type=job_type) - + job_result_storage_path = self._get_s3_uri(record.id) dataset_s3_path = self._dataset_service.get_dataset_s3_path(request.dataset) - job_config = job_settings.generate_config(request, record.id, dataset_s3_path, self.storage_path) + job_config = job_settings.generate_config(request, record.id, dataset_s3_path, job_result_storage_path) # Build runtime ENV for workers runtime_env_vars = settings.with_ray_worker_env_vars({"MZAI_JOB_ID": str(record.id)}) @@ -457,8 +478,6 @@ def create_job( # command parameters provided via command line to the ray job. # To do this, we use a dict where keys are parameter names as they'd # appear on the command line and the values are the respective params. - - # ...and use directly Job*Config(request.job.config.model_dump_json()) job_config_args = { "--config": job_config.model_dump_json(), } @@ -495,7 +514,7 @@ def create_job( # - annotation jobs do not run in workflows => they trigger dataset saving here at job level # As JobType.ANNOTATION is not used uniformly throughout our code yet, we rely on the already # existing `store_to_dataset` parameter to explicitly trigger this in the annotation case - if getattr(request.job_config, "store_to_dataset", False): + if job_type != JobType.EVALUATION and getattr(request.job_config, "store_to_dataset", False): self.add_background_task( self._background_tasks, self.handle_annotation_job, @@ -504,7 +523,6 @@ def create_job( DEFAULT_POST_INFER_JOB_TIMEOUT_SEC, ) - loguru.logger.info("Getting response...") return JobResponse.model_validate(record) def get_job(self, job_id: UUID) -> JobResponse: @@ -595,3 +613,11 @@ async def get_job_result_download(self, job_id: UUID) -> JobResultDownloadRespon ExpiresIn=settings.S3_URL_EXPIRATION, ) return JobResultDownloadResponse(id=job_id, download_url=download_url) + + def sanitize_job_name(self, job_name: str) -> str: + """Sanitize a job name to be S3-safe. + + :param job_name: The job name to sanitize. + :return: The sanitized job name. + """ + return re.sub(self.SAFE_JOB_NAME_REGEX, self.JOB_NAME_REPLACEMENT_CHAR, job_name) diff --git a/lumigator/backend/backend/services/workflows.py b/lumigator/backend/backend/services/workflows.py index 4ee058d6c..f5200db8a 100644 --- a/lumigator/backend/backend/services/workflows.py +++ b/lumigator/backend/backend/services/workflows.py @@ -156,9 +156,7 @@ async def _run_inference_eval_pipeline( try: # Attempt to submit the inference job to Ray before we track it in Lumigator. - inference_job = self._job_service.create_job( - job_infer_create, - ) + inference_job = self._job_service.create_job(job_infer_create) except (JobTypeUnsupportedError, SecretNotFoundError) as e: loguru.logger.error("Workflow pipeline error: Workflow {}. Cannot create inference job: {}", workflow.id, e) await self._handle_workflow_failure(workflow.id) @@ -208,12 +206,12 @@ async def _run_inference_eval_pipeline( try: # Inference jobs produce a new dataset # Add the dataset to the (local) database - self._job_service._add_dataset_to_db( - inference_job.id, - job_infer_create, - self._dataset_service.s3_filesystem, - dataset_filename, - request_dataset.generated, + inference_dataset_id = self._job_service._add_dataset_to_db( + job_id=inference_job.id, + request=job_infer_create, + s3_file_system=self._dataset_service.s3_filesystem, + dataset_filename=dataset_filename, + is_gt_generated=request_dataset.generated, ) except ( DatasetNotFoundError, @@ -233,19 +231,14 @@ async def _run_inference_eval_pipeline( try: # log the job to the tracking client - # TODO: Review how JobService._get_job_record works and if it can be re-used/made public. - result_key = str( - Path(settings.S3_JOB_RESULTS_PREFIX) - / settings.S3_JOB_RESULTS_FILENAME.format(job_name=job_infer_create.name, job_id=inference_job.id) - ) + # TODO: Review how JobService._get_s3_uri works and if it can be re-used/made public. + inference_results_path = self._job_service._get_s3_uri(inference_job.id) - # TODO: Review how DatasetService._get_s3_path (and similar) works and if it can be re-used/made public. - dataset_path = self._dataset_service._get_s3_path(result_key) - with self._dataset_service.s3_filesystem.open(dataset_path, "r") as f: + with self._dataset_service.s3_filesystem.open(inference_results_path, "r") as f: inf_output = JobResultObject.model_validate_json(f.read()) - inf_path = f"{settings.S3_BUCKET}/{self._job_service._get_results_s3_key(inference_job.id)}" + inference_job_output = RunOutputs( - parameters={"inference_output_s3_path": inf_path}, + parameters={"inference_output_s3_path": inference_results_path}, metrics=inf_output.metrics, ray_job_id=str(inference_job.id), ) @@ -261,8 +254,6 @@ async def _run_inference_eval_pipeline( return # FIXME The ray status is now _not enough_ to set the job status, - # use the inference job id to recover the dataset record - dataset_record = self._dataset_service._get_dataset_record_by_job_id(inference_job.id) job_config = JobEvalConfig() if request.metrics: @@ -275,16 +266,14 @@ async def _run_inference_eval_pipeline( # prepare the inputs for the evaluation job and pass the id of the new dataset job_eval_create = JobCreate( name=f"{request.name}-evaluation", - dataset=dataset_record.id, + dataset=inference_dataset_id, max_samples=experiment.max_samples, job_config=job_config, ) try: # Attempt to submit the evaluation job before we track it in Lumigator. - evaluation_job = self._job_service.create_job( - job_eval_create, - ) + evaluation_job = self._job_service.create_job(job_eval_create) except (JobTypeUnsupportedError, SecretNotFoundError) as e: loguru.logger.error( "Workflow pipeline error: Workflow {}. Cannot create evaluation job: {}", workflow.id, e @@ -294,7 +283,10 @@ async def _run_inference_eval_pipeline( # Track the evaluation job. eval_run_id = await self._tracking_client.create_job( - request.experiment_id, workflow.id, "evaluation", evaluation_job.id + experiment_id=request.experiment_id, + workflow_id=workflow.id, + name="evaluation", + job_id=evaluation_job.id, ) try: @@ -326,17 +318,14 @@ async def _run_inference_eval_pipeline( evaluation_job, ) - result_key = str( - Path(settings.S3_JOB_RESULTS_PREFIX) - / settings.S3_JOB_RESULTS_FILENAME.format(job_name=job_eval_create.name, job_id=evaluation_job.id) - ) - with self._dataset_service.s3_filesystem.open(f"{settings.S3_BUCKET}/{result_key}", "r") as f: + # Get the path to the job results stored in S3. + evaluation_result_path = self._job_service._get_s3_uri(evaluation_job.id) + + with self._dataset_service.s3_filesystem.open(evaluation_result_path, "r") as f: eval_output = JobResultObject.model_validate_json(f.read()) # TODO this generic interface should probably be the output type of the eval job but # we'll make that improvement later - # Get the dataset from the S3 bucket - result_key = self._job_service._get_results_s3_key(evaluation_job.id) formatted_metrics = self._prepare_metrics(eval_output) @@ -344,7 +333,7 @@ async def _run_inference_eval_pipeline( metrics=formatted_metrics, # eventually this could be an artifact and be stored by the tracking client, # but we'll keep it as being stored the way it is for right now. - parameters={"eval_output_s3_path": f"{settings.S3_BUCKET}/{result_key}"}, + parameters={"eval_output_s3_path": evaluation_result_path}, ray_job_id=str(evaluation_job.id), ) await self._tracking_client.update_job(eval_run_id, outputs) diff --git a/lumigator/backend/backend/tests/integration/api/routes/test_api_workflows.py b/lumigator/backend/backend/tests/integration/api/routes/test_api_workflows.py index 80576bbdf..adf99fcb8 100644 --- a/lumigator/backend/backend/tests/integration/api/routes/test_api_workflows.py +++ b/lumigator/backend/backend/tests/integration/api/routes/test_api_workflows.py @@ -5,6 +5,7 @@ import pytest import requests from fastapi.testclient import TestClient +from httpx import HTTPStatusError, RequestError from inference.schemas import GenerationConfig, InferenceJobConfig, InferenceServerConfig from loguru import logger from lumigator_schemas.datasets import DatasetFormat, DatasetResponse @@ -23,10 +24,12 @@ from lumigator_schemas.secrets import SecretUploadRequest from lumigator_schemas.tasks import TaskType from lumigator_schemas.workflows import WorkflowDetailsResponse, WorkflowResponse, WorkflowStatus -from pydantic import PositiveInt +from pydantic import PositiveInt, ValidationError from backend.main import app from backend.tests.conftest import ( + MAX_POLLS, + POLL_WAIT_TIME, TEST_CAUSAL_MODEL, TEST_SEQ2SEQ_MODEL, wait_for_job, @@ -306,8 +309,10 @@ def retrieve_and_validate_workflow_logs(local_client: TestClient, workflow_id): logs = JobLogsResponse.model_validate(logs_job_response.json()) assert logs.logs is not None assert "Inference results stored at" in logs.logs - assert "Storing evaluation results into" in logs.logs - assert logs.logs.index("Inference results stored at") < logs.logs.index("Storing evaluation results into") + assert "Storing evaluation results to" in logs.logs + assert "Storing evaluation results for S3 to" in logs.logs + assert logs.logs.index("Inference results stored at") < logs.logs.index("Storing evaluation results to") + assert logs.logs.index("Inference results stored at") < logs.logs.index("Storing evaluation results for S3 to") def delete_experiment_and_validate(local_client: TestClient, experiment_id): @@ -374,13 +379,13 @@ def test_full_experiment_launch( workflow_1 = run_workflow(local_client, experiment_id, "Workflow_1", model) workflow_1_details = wait_for_workflow_complete(local_client, workflow_1.id) assert workflow_1_details - assert len(workflow_1_details.artifacts_download_url) + assert workflow_1_details.artifacts_download_url check_artifacts_times(workflow_1_details.artifacts_download_url) validate_experiment_results(local_client, experiment_id, workflow_1_details) workflow_2 = run_workflow(local_client, experiment_id, "Workflow_2", model) workflow_2_details = wait_for_workflow_complete(local_client, workflow_2.id) assert workflow_2_details - assert len(workflow_2_details.artifacts_download_url) + assert workflow_2_details.artifacts_download_url check_artifacts_times(workflow_2_details.artifacts_download_url) list_experiments(local_client) validate_updated_experiment_results(local_client, experiment_id, workflow_1_details, workflow_2_details) @@ -437,21 +442,73 @@ def test_job_non_existing(local_client: TestClient, dependency_overrides_service assert response.json()["detail"] == f"Job with ID {non_existing_id} not found" -def wait_for_workflow_complete(local_client: TestClient, workflow_id: UUID): - workflow_status = WorkflowStatus.CREATED - workflow_details = None +def wait_for_workflow_complete(local_client: TestClient, workflow_id: UUID) -> WorkflowDetailsResponse | None: + """Wait for the workflow to complete, including post-completion processing for successful + workflows to create compiled results. - for _ in range(1, 300): - time.sleep(1) - workflow_details = WorkflowDetailsResponse.model_validate(local_client.get(f"/workflows/{workflow_id}").json()) - workflow_status = WorkflowStatus(workflow_details.status) - if workflow_status in [WorkflowStatus.SUCCEEDED, WorkflowStatus.FAILED]: - logger.info(f"Workflow status: {workflow_status}") - break - if workflow_status not in [WorkflowStatus.SUCCEEDED, WorkflowStatus.FAILED]: - raise Exception(f"Stopped, job remains in {workflow_status} status") + Makes a total of ``MAX_POLLS`` (as configured in the ``conftest.py``). + Sleeps for ``POLL_WAIT_TIME`` seconds between each poll (as configured in the ``conftest.py``). - return workflow_details + :param local_client: The test client. + :param workflow_id: The workflow ID of the workflow to wait for. + :return: The workflow details, or ``None`` if the workflow did not reach the required completed state + within the maximum number of polls. + """ + attempt = 0 + max_attempts = MAX_POLLS + wait_duration = POLL_WAIT_TIME + + while attempt < max_attempts: + # Allow the waiting interval if we're coming around again. + if attempt > 0: + time.sleep(wait_duration) + + attempt += 1 + try: + response = local_client.get(f"/workflows/{workflow_id}") + response.raise_for_status() + # Validation failure will raise an exception (``ValidationError``) which is fine + # as if we're getting a response we expect it to be valid. + workflow = WorkflowDetailsResponse.model_validate(response.json()) + except (RequestError, HTTPStatusError) as e: + # Log the error but allow us to retry the request until we've maxed out our attempts. + logger.warning(f"Workflow: {workflow_id}, request: ({attempt}/{max_attempts}) failed: {e}") + continue + + # Check if the workflow is not in a terminal state. + if workflow.status not in {WorkflowStatus.SUCCEEDED, WorkflowStatus.FAILED}: + logger.info( + f"Workflow: {workflow_id}, " + f"request: ({attempt}/{max_attempts}), " + f"status: {workflow.status}, " + f"not in terminal state" + ) + continue + + # If the workflow failed, we can stop checking. + if workflow.status == WorkflowStatus.FAILED: + return workflow + + # The workflow was successful, but we need the artifacts download url to be populated. + if not workflow.artifacts_download_url: + logger.info( + f"Workflow: {workflow_id}, " + f"request: ({attempt}/{max_attempts}), " + f"status: {workflow.status}, " + f"artifacts not ready" + ) + continue + + logger.info( + f"Workflow: {workflow_id}," + f"request: ({attempt}/{max_attempts}), " + f"status: {workflow.status}, " + f"succeeded and processed)" + ) + return workflow + + # Couldn't get the workflow details within the maximum number of polls. + return None def _test_launch_job_with_secret( diff --git a/lumigator/backend/backend/tests/unit/services/test_job_service.py b/lumigator/backend/backend/tests/unit/services/test_job_service.py index f23559e4d..20d49a1c0 100644 --- a/lumigator/backend/backend/tests/unit/services/test_job_service.py +++ b/lumigator/backend/backend/tests/unit/services/test_job_service.py @@ -35,7 +35,7 @@ def test_set_null_inference_job_params(job_record, job_service): ): dataset_s3_path = job_service._dataset_service.get_dataset_s3_path(request.dataset) job_config = job_settings_map[JobType.INFERENCE].generate_config( - request, request.dataset, dataset_s3_path, job_service.storage_path + request, request.dataset, dataset_s3_path, "s3://lumigator-storage/path/to/results.json" ) assert job_config.job.max_samples == -1 @@ -56,7 +56,7 @@ def test_set_explicit_inference_job_params(job_record, job_service): ): dataset_s3_path = job_service._dataset_service.get_dataset_s3_path(request.dataset) job_config = job_settings_map[JobType.INFERENCE].generate_config( - request, request.dataset, dataset_s3_path, job_service.storage_path + request, request.dataset, dataset_s3_path, "s3://lumigator-storage/path/to/results.json" ) assert job_config.job.max_samples == 10 diff --git a/lumigator/backend/backend/tracking/mlflow.py b/lumigator/backend/backend/tracking/mlflow.py index d14efd601..f63b940fe 100644 --- a/lumigator/backend/backend/tracking/mlflow.py +++ b/lumigator/backend/backend/tracking/mlflow.py @@ -294,7 +294,7 @@ async def get_workflow(self, workflow_id: str) -> WorkflowDetailsResponse | None # download the file # get the file from the S3 bucket with self._s3_file_system.open(f"{param['value']}") as f: - job_results = JobResultObject.model_validate(json.loads(f.read())) + job_results = JobResultObject.model_validate_json(f.read()) # if any keys are the same, log a warning and then overwrite the key for job_result_item in job_results: if job_result_item[1] is None: diff --git a/lumigator/jobs/evaluator/evaluator.py b/lumigator/jobs/evaluator/evaluator.py index 063ea5f63..68210a92d 100644 --- a/lumigator/jobs/evaluator/evaluator.py +++ b/lumigator/jobs/evaluator/evaluator.py @@ -1,8 +1,10 @@ +import argparse import json import os +import re from pathlib import Path +from uuid import UUID -import click import s3fs from datasets import load_from_disk from datasets.features.features import Value @@ -14,41 +16,49 @@ DEEPEVAL_CONFIG_FILENAME = ".deepeval" +SAFE_JOB_NAME_REGEX = re.compile(r"[^\w\-_.]") + +JOB_NAME_REPLACEMENT_CHAR = "-" + def save_to_disk(local_path: Path, data: JobOutput): - logger.info(f"Storing evaluation results into {local_path}...") + logger.info(f"Storing evaluation results to {local_path}...") local_path.parent.mkdir(exist_ok=True, parents=True) with local_path.open("w") as f: f.write(data.model_dump_json()) -def save_to_s3(config: EvalJobConfig, local_path: Path, storage_path: str): +def save_to_s3(job_id: UUID, job_name: str, local_path: Path, storage_path: str): s3 = s3fs.S3FileSystem() + # If there's no 'filename' let's build a path. if storage_path.endswith("/"): - storage_path = "s3://" + str(Path(storage_path[5:]) / config.name / "results.json") - logger.info(f"Storing evaluation results into {storage_path}...") + safe_name = sanitize_job_name(job_name) + storage_path = storage_path.removeprefix("s3://").rstrip("/") + storage_path = f"s3://{storage_path}/{safe_name}/{job_id}/results.json" + logger.info(f"Storing evaluation results for S3 to {storage_path}...") s3.put_file(local_path, storage_path) -def save_outputs(config: EvalJobConfig, eval_results: JobOutput) -> Path: - storage_path = config.evaluation.storage_path +def save_outputs(job_id: UUID, job_name: str, storage_path: str, eval_results: JobOutput) -> str | None: + # Sanitize name to be S3-safe. + safe_name = sanitize_job_name(job_name) # generate local temp file ANYWAY: # - if storage_path is not provided, it will be stored and kept into a default dir # - if storage_path is provided AND saving to S3 is successful, local file is deleted - local_path = Path(Path.home() / ".lumigator" / "results" / config.name / "results.json") + local_path = Path(Path.home() / ".lumigator" / "results" / safe_name / str(job_id) / "results.json") try: save_to_disk(local_path, eval_results) # copy to s3 and return path if storage_path is not None and storage_path.startswith("s3://"): - save_to_s3(config, local_path, storage_path) + save_to_s3(job_id, job_name, local_path, storage_path) Path.unlink(local_path) Path.rmdir(local_path.parent) return storage_path else: - return local_path + return str(local_path) except Exception as e: logger.error(e) @@ -74,7 +84,7 @@ def run_eval_metrics(examples: list, predictions: list, ground_truth: list, eval return EvalJobMetrics.model_validate(evaluation_results) -def run_eval(config: EvalJobConfig) -> Path: +def run_eval(config: EvalJobConfig, job_id: UUID) -> str | None: max_samples = config.evaluation.max_samples # Load dataset given its URI @@ -112,8 +122,13 @@ def replace_none_with_empty(row): max_samples = len(dataset) dataset = dataset.select(range(max_samples)) + metric_results: EvalJobMetrics + evaluation_time: float metric_results, evaluation_time = run_eval_metrics( - dataset["examples"], dataset[config.predictions_field], dataset["ground_truth"], config.evaluation.metrics + examples=dataset["examples"], + predictions=dataset[config.predictions_field], + ground_truth=dataset["ground_truth"], + evaluation_metrics=config.evaluation.metrics, ) # add input data to results dict @@ -130,39 +145,47 @@ def replace_none_with_empty(row): parameters=config, artifacts=artifacts, ) - output_path = save_outputs(config, evaluation_results) + output_path = save_outputs(job_id, config.name, config.evaluation.storage_path, evaluation_results) return output_path -@click.command() -@click.option("--config") -def eval_command(config: str) -> None: - config_dict = json.loads(config) - logger.info(f"{config_dict}") - config_model = EvalJobConfig(**config_dict) - - # NOTE: Temporary solution to avoid API key issues in G-Eval which defaults to calling OpenAI. - if "g_eval_summarization" in config_model.evaluation.metrics and (api_key := os.environ.get("api_key")): - os.environ["OPENAI_API_KEY"] = api_key - - if config_model.evaluation.llm_as_judge is not None: +def prepare_deepeval_config_file(config: EvalJobConfig) -> None: + if config.evaluation.llm_as_judge is not None: # create a .deepeval config file if an ollama model is specified deepeval_config = { - "LOCAL_MODEL_NAME": config_model.evaluation.llm_as_judge.model_name, - "LOCAL_MODEL_BASE_URL": config_model.evaluation.llm_as_judge.model_base_url, + "LOCAL_MODEL_NAME": config.evaluation.llm_as_judge.model_name, + "LOCAL_MODEL_BASE_URL": config.evaluation.llm_as_judge.model_base_url, "USE_LOCAL_MODEL": "YES", "USE_AZURE_OPENAI": "NO", - "LOCAL_MODEL_API_KEY": config_model.evaluation.llm_as_judge.model_api_key, + "LOCAL_MODEL_API_KEY": config.evaluation.llm_as_judge.model_api_key, } with Path(DEEPEVAL_CONFIG_FILENAME).open("w") as f: - json.dump(deepeval_config, f) + json.dump(deepeval_config, f, indent=4) else: # otherwise, make sure we'll start without a .deepeval config file Path(DEEPEVAL_CONFIG_FILENAME).unlink(missing_ok=True) - run_eval(config_model) + +def sanitize_job_name(job_name: str) -> str: + """Sanitize a job name to be S3-safe.""" + return re.sub(SAFE_JOB_NAME_REGEX, JOB_NAME_REPLACEMENT_CHAR, job_name) if __name__ == "__main__": - logger.info("Starting evaluation job...") - eval_command() + parser = argparse.ArgumentParser() + parser.add_argument("--config", type=str, help="Configuration in JSON format") + args = parser.parse_args() + + if not args.config: + parser.print_help() # Print the usage message and exit + err_str = "No input configuration provided. Please pass one using the --config flag" + logger.error(err_str) + else: + config = EvalJobConfig.model_validate_json(args.config) + # NOTE: Temporary solution to avoid API key issues in G-Eval which defaults to calling OpenAI. + if "g_eval_summarization" in config.evaluation.metrics and (api_key := os.environ.get("api_key")): + os.environ["OPENAI_API_KEY"] = api_key + + job_id: UUID = UUID(os.environ.get("MZAI_JOB_ID")) + prepare_deepeval_config_file(config) + run_eval(config, job_id) diff --git a/lumigator/jobs/evaluator/tests/test_eval_metrics.py b/lumigator/jobs/evaluator/tests/test_eval_metrics.py index a4df67d9c..c077a1461 100644 --- a/lumigator/jobs/evaluator/tests/test_eval_metrics.py +++ b/lumigator/jobs/evaluator/tests/test_eval_metrics.py @@ -1,5 +1,6 @@ import shutil from pathlib import Path +from uuid import UUID import numpy as np import pytest @@ -71,10 +72,11 @@ def test_empty_fields_cast_as_float64(): test_path = test_path_csv.with_suffix("") csv = load_dataset("csv", data_files=str(test_path_csv), split="train") csv.save_to_disk(test_path) + non_existing_id = UUID("d34dbeef-4bea-4d19-ad06-214202165812") eval = EvalJobConfig( name="test", dataset=DatasetConfig(path=str(test_path)), evaluation=EvaluationConfig(metrics=["rouge"], storage_path="/tmp/test_empty_fields_cast_as_float64.metrics"), ) - run_eval(eval) + run_eval(eval, non_existing_id) shutil.rmtree(test_path) diff --git a/lumigator/jobs/inference/inference.py b/lumigator/jobs/inference/inference.py index e24d36816..86c3635ed 100644 --- a/lumigator/jobs/inference/inference.py +++ b/lumigator/jobs/inference/inference.py @@ -2,7 +2,9 @@ import argparse import os +import re from pathlib import Path +from uuid import UUID import s3fs from dataset import create_dataloader @@ -18,6 +20,10 @@ from schemas import AverageInferenceMetrics, InferenceJobOutput, JobOutput, PredictionResult +SAFE_JOB_NAME_REGEX = re.compile(r"[^\w\-_.]") + +JOB_NAME_REPLACEMENT_CHAR = "-" + @timer def predict(dataloader: DataLoader, model_client: BaseModelClient) -> list[PredictionResult]: @@ -30,49 +36,50 @@ def predict(dataloader: DataLoader, model_client: BaseModelClient) -> list[Predi return predictions -def save_to_disk(local_path: Path, results: JobOutput): - logger.info(f"Storing into {local_path}...") +def save_to_disk(local_path: Path, data: JobOutput): + logger.info(f"Storing inference results to {local_path}...") local_path.parent.mkdir(exist_ok=True, parents=True) with local_path.open("w") as f: - f.write(results.model_dump_json()) + f.write(data.model_dump_json()) -def save_to_s3(config: InferenceJobConfig, local_path: Path, storage_path: str): +def save_to_s3(job_id: UUID, job_name: str, local_path: Path, storage_path: str): s3 = s3fs.S3FileSystem() + # If there's no 'filename' let's build a path. if storage_path.endswith("/"): - storage_path = "s3://" + str(Path(storage_path[5:]) / config.name / "results.json") - logger.info(f"Storing into {storage_path}...") + safe_name = sanitize_job_name(job_name) + storage_path = storage_path.removeprefix("s3://").rstrip("/") + storage_path = f"s3://{storage_path}/{safe_name}/{job_id}/results.json" + logger.info(f"Storing inference results for S3 to {storage_path}...") s3.put_file(local_path, storage_path) -def save_outputs(config: InferenceJobConfig, results: JobOutput) -> Path: - storage_path = config.job.storage_path +def save_outputs(storage_path: str, job_id: UUID, job_name: str, results: JobOutput) -> str | None: + # Sanitize name to be S3-safe. + safe_name = sanitize_job_name(job_name) # generate local temp file ANYWAY: # - if storage_path is not provided, it will be stored and kept into a default dir # - if storage_path is provided AND saving to S3 is successful, local file is deleted - local_path = Path(Path.home() / ".lumigator" / "results" / config.name / "results.json") + local_path = Path(Path.home() / ".lumigator" / "results" / safe_name / str(job_id) / "results.json") try: save_to_disk(local_path, results) # copy to s3 and return path - if storage_path is not None and storage_path.startswith("s3://"): - save_to_s3(config, local_path, storage_path) + if storage_path and storage_path.startswith("s3://"): + save_to_s3(job_id, job_name, local_path, storage_path) Path.unlink(local_path) Path.rmdir(local_path.parent) return storage_path else: - return local_path + return str(local_path) except Exception as e: logger.error(e) -def run_inference(config: InferenceJobConfig, api_key: str | None = None) -> Path: - # initialize output dictionary - output = {} - +def run_inference(config: InferenceJobConfig, job_id: UUID, api_key: str | None = None) -> str | None: # Load dataset given its URI dataset = load_from_disk(config.dataset.path) @@ -102,16 +109,21 @@ def run_inference(config: InferenceJobConfig, api_key: str | None = None) -> Pat else: raise NotImplementedError("Inference pipeline not supported.") + prediction_results: list[PredictionResult] + inference_time: float + prediction_results, inference_time = predict(dataloader_iterable, model_client) + # We keep any columns that were already there (not just the original input # samples, but also past predictions under another column name) - output.update(dataset.to_dict()) + output = dataset.to_dict() # We are trusting the user: if the dataset already had a column with the output_field # they selected, we overwrite it with the values from our inference. if config.job.output_field in dataset.column_names: logger.warning(f"Overwriting {config.job.output_field}") - prediction_results, inference_time = predict(dataloader_iterable, model_client) + # NOTE: Are we certain that a dataset wouldn't have fields other than the output_field? + # If it can then we are potentially overwriting other fields too. output[config.job.output_field] = [p.prediction for p in prediction_results] output["reasoning"] = [p.reasoning for p in prediction_results] output["inference_metrics"] = [p.metrics for p in prediction_results] @@ -119,32 +131,52 @@ def run_inference(config: InferenceJobConfig, api_key: str | None = None) -> Pat output["inference_time"] = inference_time artifacts = InferenceJobOutput.model_validate(output) - if all(p.metrics is not None for p in prediction_results): - avg_prompt_tokens = sum([p.metrics.prompt_tokens for p in prediction_results]) / len(prediction_results) - avg_total_tokens = sum([p.metrics.total_tokens for p in prediction_results]) / len(prediction_results) - avg_completion_tokens = sum([p.metrics.completion_tokens for p in prediction_results]) / len(prediction_results) - avg_reasoning_tokens = sum([p.metrics.reasoning_tokens for p in prediction_results]) / len(prediction_results) - avg_answer_tokens = sum([p.metrics.answer_tokens for p in prediction_results]) / len(prediction_results) - metrics = AverageInferenceMetrics( + metrics = _calculate_average_metrics(prediction_results) + results = JobOutput(artifacts=artifacts, parameters=config, metrics=metrics) + + output_path = save_outputs(config.job.storage_path, job_id, config.name, results) + return output_path + + +def _calculate_average_metrics(prediction_results: list[PredictionResult]) -> AverageInferenceMetrics | None: + """Calculate the average metrics from a list of prediction results. + + :param prediction_results: List of prediction results to calculate the average metrics from. + :returns: ``AverageInferenceMetrics`` object containing the average metrics, + or None if the prediction results don't contain any metrics. + :raises ValueError: If some prediction results have metrics and some don't. + """ + if not prediction_results: + return None + + all_metrics_present = all(p.metrics for p in prediction_results) + + if any(p.metrics for p in prediction_results) != all_metrics_present: + raise ValueError("Prediction result 'metrics' must be present in ALL results or NONE, but not in SOME.") + + # Only attempt to metric calculate averages if we have a metric for EVERY prediction result. + if all_metrics_present: + total_results = len(prediction_results) + avg_prompt_tokens = sum(p.metrics.prompt_tokens for p in prediction_results) / total_results + avg_total_tokens = sum(p.metrics.total_tokens for p in prediction_results) / total_results + avg_completion_tokens = sum(p.metrics.completion_tokens for p in prediction_results) / total_results + avg_reasoning_tokens = sum(p.metrics.reasoning_tokens for p in prediction_results) / total_results + avg_answer_tokens = sum(p.metrics.answer_tokens for p in prediction_results) / total_results + + return AverageInferenceMetrics( avg_prompt_tokens=avg_prompt_tokens, avg_total_tokens=avg_total_tokens, avg_completion_tokens=avg_completion_tokens, avg_reasoning_tokens=avg_reasoning_tokens, avg_answer_tokens=avg_answer_tokens, ) - results = JobOutput( - metrics=metrics, - parameters=config, - artifacts=artifacts, - ) - else: - results = JobOutput( - parameters=config, - artifacts=artifacts, - ) - output_path = save_outputs(config, results) - return output_path + return None + + +def sanitize_job_name(job_name: str) -> str: + """Sanitize a job name to be S3-safe.""" + return re.sub(SAFE_JOB_NAME_REGEX, JOB_NAME_REPLACEMENT_CHAR, job_name) if __name__ == "__main__": @@ -160,5 +192,8 @@ def run_inference(config: InferenceJobConfig, api_key: str | None = None) -> Pat config = InferenceJobConfig.model_validate_json(args.config) # Attempt to retrieve the API key from the environment for use with the clients. api_key = os.environ.get("api_key") - result_dataset_path = run_inference(config, api_key) + # Pull the job ID from the runtime environment. + job_id: UUID = UUID(os.environ.get("MZAI_JOB_ID")) + + result_dataset_path = run_inference(config, job_id, api_key) logger.info(f"Inference results stored at {result_dataset_path}") diff --git a/lumigator/jobs/inference/inference_config.py b/lumigator/jobs/inference/inference_config.py index 946c086bc..b24359398 100644 --- a/lumigator/jobs/inference/inference_config.py +++ b/lumigator/jobs/inference/inference_config.py @@ -4,7 +4,7 @@ import torch from huggingface_hub.utils import validate_repo_id from loguru import logger -from pydantic import AfterValidator, BeforeValidator, ConfigDict, Field, PositiveInt, computed_field +from pydantic import AfterValidator, BeforeValidator, ConfigDict, Field, computed_field from schemas import DatasetConfig from schemas import GenerationConfig as BaseGenerationConfig @@ -69,11 +69,7 @@ class Accelerator(str, Enum): class JobConfig(BaseJobConfig): max_samples: int = -1 # set to all samples by default - batch_size: PositiveInt = 1 - storage_path: str output_field: str = "predictions" - enable_tqdm: bool = True - model_config = ConfigDict(extra="forbid") class InferenceServerConfig(BaseInferenceServerConfig): diff --git a/lumigator/jobs/inference/schemas.py b/lumigator/jobs/inference/schemas.py index 3049be5aa..2525e2cab 100644 --- a/lumigator/jobs/inference/schemas.py +++ b/lumigator/jobs/inference/schemas.py @@ -4,7 +4,7 @@ """ from lumigator_schemas.tasks import SummarizationTaskDefinition, TaskDefinition, TaskType -from pydantic import BaseModel, ConfigDict, Field, PositiveInt +from pydantic import BaseModel, ConfigDict, Field, PositiveInt, model_validator class DatasetConfig(BaseModel): @@ -71,8 +71,8 @@ class InferenceMetrics(BaseModel): prompt_tokens: int total_tokens: int completion_tokens: int - reasoning_tokens: int | None = None - answer_tokens: int | None = None + reasoning_tokens: int + answer_tokens: int class AverageInferenceMetrics(BaseModel): @@ -91,17 +91,25 @@ class InferenceJobOutput(BaseModel): ground_truth: list | None = None model: str inference_time: float - inference_metrics: list[InferenceMetrics] | list[None] = None + inference_metrics: list[InferenceMetrics | None] = [] class PredictionResult(BaseModel): model_config = ConfigDict(extra="forbid") prediction: str reasoning: str | None = None - metrics: InferenceMetrics = None + metrics: InferenceMetrics | None = None class JobOutput(BaseModel): - metrics: AverageInferenceMetrics | None = {} + metrics: AverageInferenceMetrics | dict = {} artifacts: InferenceJobOutput parameters: InferenceJobConfig + + @model_validator(mode="before") + @classmethod + def ensure_metrics_default(cls, values): + # If metrics is set to None, set it to an empty dict. + if values.get("metrics") is None: + values["metrics"] = {} + return values diff --git a/lumigator/jobs/inference/tests/test_inference.py b/lumigator/jobs/inference/tests/test_inference.py new file mode 100644 index 000000000..00fa5a7a1 --- /dev/null +++ b/lumigator/jobs/inference/tests/test_inference.py @@ -0,0 +1,71 @@ +import pytest +from inference import _calculate_average_metrics + +from schemas import AverageInferenceMetrics, InferenceMetrics, PredictionResult + + +def _make_result_with_metrics( + prediction: str = "test", + reasoning: str = None, + prompt=10, + total=20, + completion=5, + reasoning_tokens=3, + answer=2, +): + return PredictionResult( + prediction=prediction, + reasoning=reasoning, + metrics=InferenceMetrics( + prompt_tokens=prompt, + total_tokens=total, + completion_tokens=completion, + reasoning_tokens=reasoning_tokens, + answer_tokens=answer, + ), + ) + + +def _make_result_without_metrics(prediction: str = "test", reasoning: str = None): + return PredictionResult( + prediction=prediction, + reasoning=reasoning, + metrics=None, + ) + + +class TestCalculateAverageMetrics: + def test_returns_average_when_all_have_metrics(self): + results = [ + _make_result_with_metrics(prompt=10, total=20, completion=5, reasoning_tokens=3, answer=2), + _make_result_with_metrics(prompt=30, total=40, completion=15, reasoning_tokens=9, answer=6), + ] + + avg = _calculate_average_metrics(results) + + assert isinstance(avg, AverageInferenceMetrics) + assert avg.avg_prompt_tokens == 20.0 + assert avg.avg_total_tokens == 30.0 + assert avg.avg_completion_tokens == 10.0 + assert avg.avg_reasoning_tokens == 6.0 + assert avg.avg_answer_tokens == 4.0 + + def test_returns_none_when_none_have_metrics(self): + results = [ + _make_result_without_metrics(), + _make_result_without_metrics(), + ] + + assert _calculate_average_metrics(results) is None + + def test_raises_when_some_have_metrics_and_some_dont(self): + results = [ + _make_result_with_metrics(), + _make_result_without_metrics(), + ] + + with pytest.raises(ValueError, match="Prediction result 'metrics' must be present in ALL results or NONE"): + _calculate_average_metrics(results) + + def test_handles_empty_list(self): + assert _calculate_average_metrics([]) is None diff --git a/lumigator/schemas/lumigator_schemas/jobs.py b/lumigator/schemas/lumigator_schemas/jobs.py index 93e62091d..1879a240c 100644 --- a/lumigator/schemas/lumigator_schemas/jobs.py +++ b/lumigator/schemas/lumigator_schemas/jobs.py @@ -113,7 +113,7 @@ class DeepEvalLocalModelConfig(BaseModel): class JobEvalConfig(BaseJobConfig): job_type: Literal[JobType.EVALUATION] = JobType.EVALUATION - # NOTE: If changing the default metrics, please ensure that they do not include + # NOTE: If changing the default metrics, please ensure that they do not include # any requirements for external API calls that require an API key to be configured. metrics: list[str] = ["rouge", "meteor", "bertscore", "bleu"] llm_as_judge: DeepEvalLocalModelConfig | None = None @@ -258,7 +258,7 @@ class JobResultObject(BaseModel): it should be accepted by the backend. """ - model_config = ConfigDict(extra="forbid") + model_config = ConfigDict(extra="ignore") metrics: dict = {} parameters: dict = {} artifacts: dict = {} diff --git a/lumigator/sdk/tests/integration/test_scenarios.py b/lumigator/sdk/tests/integration/test_scenarios.py index 599b74a05..4190e85a5 100644 --- a/lumigator/sdk/tests/integration/test_scenarios.py +++ b/lumigator/sdk/tests/integration/test_scenarios.py @@ -339,9 +339,13 @@ def test_create_exp_workflow_check_results( assert logs_response is not None assert logs_response.logs is not None assert "Inference results stored at" in logs_response.logs - assert "Storing evaluation results into" in logs_response.logs + assert "Storing evaluation results to" in logs_response.logs + assert "Storing evaluation results for S3 to" in logs_response.logs assert logs_response.logs.index("Inference results stored at") < logs_response.logs.index( - "Storing evaluation results into" + "Storing evaluation results to" + ) + assert logs_response.logs.index("Inference results stored at") < logs_response.logs.index( + "Storing evaluation results for S3 to" ) # Delete the experiment