Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add early storage validation and improve error messages
Co-authored-by: slister1001 <[email protected]>
  • Loading branch information
Copilot and slister1001 committed Dec 11, 2025
commit 7565bd6bf1c8d2960c9959d4d418354245d7c8e5
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,69 @@ def update_red_team_run(self, *, name: str, red_team: RedTeamUpload, **kwargs):
update_run_response = self.rest_client.red_teams.upload_update_run(name=name, redteam=red_team, **kwargs)

return update_run_response

def test_storage_upload(self, **kwargs) -> bool:
"""Test storage account connectivity by performing a minimal upload operation.

This method validates that the storage account is accessible and the credentials
have the necessary permissions before proceeding with the full evaluation workflow.

:param kwargs: Additional keyword arguments to pass to the underlying API calls
:return: True if the test upload succeeds
:rtype: bool
:raises EvaluationException: If the storage account is inaccessible or lacks permissions
"""
import tempfile
import os
from azure.ai.evaluation._exceptions import EvaluationException, ErrorBlame, ErrorCategory, ErrorTarget

LOGGER.debug("Testing storage account connectivity...")

try:
# Create a test name and version
test_name = f"connectivity-test"
test_version = "1"

# Start a pending upload to get storage credentials
start_pending_upload_response = self.rest_client.evaluation_results.start_pending_upload(
name=test_name,
version=test_version,
body=PendingUploadRequest(pending_upload_type=PendingUploadType.TEMPORARY_BLOB_REFERENCE),
**kwargs,
)

# Create a temporary test file
with tempfile.TemporaryDirectory() as tmpdir:
test_file_path = os.path.join(tmpdir, "test.txt")
with open(test_file_path, "w") as f:
f.write("connectivity test")

# Attempt to upload the test file
with ContainerClient.from_container_url(
start_pending_upload_response.blob_reference_for_consumption.credential.sas_uri
) as container_client:
upload(path=test_file_path, container_client=container_client, logger=LOGGER)

LOGGER.debug("Storage account connectivity test successful")
return True

except Exception as e:
LOGGER.error(f"Storage account connectivity test failed: {str(e)}")

# Re-raise with helpful context
error_msg = (
f"Failed to connect to Azure Blob Storage. Error: {str(e)}. "
f"Please verify that:\n"
f" 1. The storage account exists and is accessible\n"
f" 2. Your credentials have the necessary permissions (Storage Blob Data Contributor role)\n"
f" 3. Network access to the storage account is not blocked by firewall rules\n"
f" 4. The Azure AI project is properly configured"
)

raise EvaluationException(
message=error_msg,
internal_message=f"Storage connectivity test failed: {e}",
target=ErrorTarget.RAI_CLIENT,
category=ErrorCategory.UPLOAD_ERROR,
blame=ErrorBlame.USER_ERROR,
)
Original file line number Diff line number Diff line change
Expand Up @@ -914,9 +914,26 @@ def upload(path: str, container_client: ContainerClient, logger=None):
logger.debug(f"File '{local}' uploaded successfully")

except Exception as e:
# Extract storage account information if available
storage_info = ""
try:
account_name = container_client.account_name if hasattr(container_client, 'account_name') else "unknown"
storage_info = f" Storage account: {account_name}."
except Exception:
pass

error_msg = (
f"Failed to upload evaluation results to Azure Blob Storage.{storage_info} "
f"Error: {str(e)}. "
f"Please verify that:\n"
f" 1. The storage account exists and is accessible\n"
f" 2. Your credentials have the necessary permissions (Storage Blob Data Contributor role)\n"
f" 3. Network access to the storage account is not blocked by firewall rules"
)

raise EvaluationException(
message=f"Error uploading file: {e}",
internal_message=f"Error uploading file: {e}",
message=error_msg,
internal_message=f"Error uploading file to blob storage: {e}",
target=ErrorTarget.RAI_CLIENT,
category=ErrorCategory.UPLOAD_ERROR,
blame=ErrorBlame.SYSTEM_ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,26 @@ def start_redteam_mlflow_run(

return eval_run

def test_storage_upload(self) -> bool:
"""Test storage account connectivity before starting the scan.

This method validates that storage upload will work by testing with the
appropriate client (OneDP or MLFlow) depending on project configuration.

:return: True if the test upload succeeds
:rtype: bool
:raises EvaluationException: If the storage account is inaccessible or lacks permissions
"""
if self._one_dp_project:
# For OneDP projects, test using the evaluation client
return self.generated_rai_client._evaluation_onedp_client.test_storage_upload()
else:
# For non-OneDP projects (MLFlow), we don't have a direct upload test
# Storage is tested when we create artifacts during log_artifact
# So we just return True here and let the actual upload fail if there are issues
self.logger.debug("Storage upload test skipped for non-OneDP project (will be tested during actual upload)")
return True

async def log_redteam_results_to_mlflow(
self,
redteam_result: RedTeamResult,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1347,6 +1347,17 @@ async def scan(
# Update result processor with the AI studio URL now that it's available
self.result_processor.ai_studio_url = self.mlflow_integration.ai_studio_url

# Test storage connectivity early to catch issues before running attacks
try:
tqdm.write("🔍 Validating storage account connectivity...")
self.mlflow_integration.test_storage_upload()
tqdm.write("✅ Storage account validation successful")
except Exception as e:
# Log the error and re-raise to stop the scan early
self.logger.error(f"Storage account validation failed: {str(e)}")
tqdm.write(f"❌ Storage account validation failed: {str(e)}")
raise

# Process strategies and execute scan
flattened_attack_strategies = get_flattened_attack_strategies(attack_strategies)
self._validate_strategies(flattened_attack_strategies)
Expand Down
Loading