Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
test: Add vLLM disagg cancellation tests
  • Loading branch information
kthui committed Aug 26, 2025
commit ebd98e94190af1b6d48e53b29278bd7c30febe11
45 changes: 31 additions & 14 deletions tests/fault_tolerance/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Tests worker fault tolerance with migration support using the `test_request_migr
- Model: `deepseek-ai/DeepSeek-R1-Distill-Llama-8B`
- `--enforce-eager`, `--gpu-memory-utilization 0.45`
- `--max-model-len 8192`, `--migration-limit 3`
3. Waits for both workers to be fully ready (looking for "Reading Events from" messages)
3. Waits for both workers to be fully ready (health check returns "ready" status)
4. Sends a test request ("Who are you?", 100 tokens) to determine which worker handles requests
5. Determines primary/backup worker roles based on round-robin routing and log analysis
6. Sends a long completion request ("Tell me a long long long story about yourself?", 8000 tokens) in a separate thread
Expand All @@ -24,13 +24,16 @@ Tests worker fault tolerance with migration support using the `test_request_migr

### `test_request_cancellation.py`

Tests request cancellation functionality across multiple API endpoints using the `test_request_cancellation_vllm` function. This test:
Tests request cancellation functionality across multiple API endpoints and deployment configurations. Contains three test functions:

#### `test_request_cancellation_vllm`
Tests basic request cancellation with a single worker:

0. Downloads the DeepSeek-R1-Distill-Llama-8B model from HuggingFace if not already cached
1. Starts a Dynamo frontend using `python -m dynamo.frontend` with debug logging enabled
2. Starts a single worker using `python3 -m dynamo.vllm` with specific configuration:
- Model: `deepseek-ai/DeepSeek-R1-Distill-Llama-8B`
- `--enforce-eager`, `--max-model-len 16384`, `--migration-limit 3`
- `--enforce-eager`, `--gpu-memory-utilization 0.45`, `--max-model-len 8192`, `--migration-limit 3`
- Debug logging enabled on port 8081
3. Tests request cancellation across three scenarios:
- **Completion API**: `/v1/completions` endpoint cancellation
Expand All @@ -42,11 +45,30 @@ Tests request cancellation functionality across multiple API endpoints using the
- Uses incremental log offset tracking to avoid false positives from previous tests
5. Checks for specific cancellation patterns:
- Frontend log: "issued control message Kill to sender"
- Worker log: "finished processing python async generator stream"
- Worker log: "Aborted Request ID: <request_id>" matching the "New Request ID: <request_id>"

#### `test_request_cancellation_vllm_decode`
Tests request cancellation during disaggregated decode phase:

0. Downloads the DeepSeek-R1-Distill-Llama-8B model from HuggingFace if not already cached
1. Starts a Dynamo frontend using `python -m dynamo.frontend` with debug logging enabled
2. Starts a prefill worker using `python3 -m dynamo.vllm --is-prefill-worker` on port 8082
3. Starts a decode worker using `python3 -m dynamo.vllm` on port 8081
4. Tests completion request cancellation in the disaggregated setup
5. Validates cancellation messages appear in prefill worker, decode worker, and frontend logs
6. Checks for specific patterns:
- Frontend log: "issued control message Kill to sender"
- Decode worker log: "Aborted Request ID: <request_id>"
- Prefill worker log: "New Prefill Request ID: <request_id>"

#### `test_request_cancellation_vllm_prefill`
Tests request cancellation during disaggregated prefill phase:

- (Skipped until request cancellation can cancel before receiving the first response)

## Prerequisites

- vLLM backend installed (`pip install ai-dynamo-vllm`)
- vLLM backend installed
- NATS and etcd services running (provided by `runtime_services` fixture)
- Access to DeepSeek-R1-Distill-Llama-8B model (automatically downloaded from HuggingFace)
- Sufficient GPU memory
Expand All @@ -57,18 +79,12 @@ To run the fault tolerance tests:

```bash
# Run all fault tolerance tests
pytest /workspace/tests/fault_tolerance

# Run specific test with verbose output
pytest /workspace/tests/fault_tolerance/test_request_migration.py::test_request_migration_vllm -v
pytest /workspace/tests/fault_tolerance/test_request_cancellation.py::test_request_cancellation_vllm -v

# Run with specific markers
pytest -m "e2e and vllm" /workspace/tests/fault_tolerance

# Run with debug logging
# Run specific test functions with debug logging
pytest /workspace/tests/fault_tolerance/test_request_migration.py::test_request_migration_vllm -v -s
pytest /workspace/tests/fault_tolerance/test_request_cancellation.py::test_request_cancellation_vllm -v -s
pytest /workspace/tests/fault_tolerance/test_request_cancellation.py::test_request_cancellation_vllm_decode -v -s
```

## Test Markers
Expand All @@ -89,7 +105,7 @@ The tests typically take 2-3 minutes to complete each, including:
- Model download/loading time (if not cached) - can take 1-2 minutes for first run
- Worker startup and registration
- Request processing and response validation
- Worker failure simulation and migration (for migration test) / Request cancellation validation (for cancellation test)
- Worker failure simulation and migration (for migration test) / Request cancellation validation (for cancellation tests)
- Cleanup

## Troubleshooting
Expand All @@ -104,3 +120,4 @@ If tests fail:
6. Verify that the DeepSeek-R1-Distill-Llama-8B model can be accessed
7. For cancellation tests: Check that timeout-based cancellation is working properly and cancellation patterns appear in logs
8. For migration tests: Verify worker process termination and stream recreation behavior
9. For disaggregated cancellation tests: Ensure both prefill and decode workers are properly started and cancellation works across the disaggregated setup
172 changes: 142 additions & 30 deletions tests/fault_tolerance/test_request_cancellation.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,28 +48,39 @@ def __init__(self, request):
class DynamoWorkerProcess(ManagedProcess):
"""Process manager for Dynamo worker with vLLM backend"""

def __init__(self, request):
def __init__(self, request, is_prefill: bool = False):
command = [
"python3",
"-m",
"dynamo.vllm",
"--model",
"deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
"--enforce-eager",
"--gpu-memory-utilization",
"0.45",
"--max-model-len",
"16384",
"8192",
"--migration-limit",
"3",
]

# Add prefill worker flag if needed
if is_prefill:
command.append("--is-prefill-worker")

# Set port based on worker type
port = "8082" if is_prefill else "8081"

# Set debug logging environment
env = os.environ.copy()
env["DYN_LOG"] = "debug"
env["DYN_SYSTEM_ENABLED"] = "true"
env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
env["DYN_SYSTEM_PORT"] = "8081"
env["DYN_SYSTEM_PORT"] = port

log_dir = f"{request.node.name}_worker"
# Set log directory based on worker type
worker_type = "prefill_worker" if is_prefill else "worker"
log_dir = f"{request.node.name}_{worker_type}"

# Clean up any existing log directory from previous runs
try:
Expand All @@ -82,13 +93,15 @@ def __init__(self, request):
super().__init__(
command=command,
env=env,
health_check_urls=[("http://localhost:8081/health", self.is_ready)],
health_check_urls=[(f"http://localhost:{port}/health", self.is_ready)],
timeout=300,
display_output=True,
terminate_existing=True,
terminate_existing=False,
log_dir=log_dir,
)

self.is_prefill = is_prefill

def get_pid(self):
"""Get the PID of the worker process"""
return self.proc.pid if self.proc else None
Expand All @@ -98,11 +111,14 @@ def is_ready(self, response) -> bool:
try:
data = response.json()
if data.get("status") == "ready":
logger.info("Worker status is ready")
worker_type = "Prefill worker" if self.is_prefill else "Worker"
logger.info(f"{worker_type} status is ready")
return True
logger.warning(f"Worker status is not ready: {data.get('status')}")
worker_type = "Prefill worker" if self.is_prefill else "Worker"
logger.warning(f"{worker_type} status is not ready: {data.get('status')}")
except ValueError:
logger.warning("Worker health response is not valid JSON")
worker_type = "Prefill worker" if self.is_prefill else "Worker"
logger.warning(f"{worker_type} health response is not valid JSON")
return False


Expand Down Expand Up @@ -211,19 +227,18 @@ def send_chat_completion_request(
raise


def send_request_and_cancel(request_type: str = "completion"):
def send_request_and_cancel(request_type: str = "completion", timeout: int = 1):
"""Send a request with short timeout to trigger cancellation"""
logger.info(f"Sending {request_type} request to be cancelled...")

prompt = "Tell me a very long and detailed story about the history of artificial intelligence, including all major milestones, researchers, and breakthroughs?"
timeout = 1 # Short timeout to trigger cancellation
try:
if request_type == "completion":
response = send_completion_request(prompt, 16000, timeout)
response = send_completion_request(prompt, 8000, timeout)
elif request_type == "chat_completion":
response = send_chat_completion_request(prompt, 16000, timeout, False)
response = send_chat_completion_request(prompt, 8000, timeout, False)
elif request_type == "chat_completion_stream":
response = send_chat_completion_request(prompt, 16000, timeout, True)
response = send_chat_completion_request(prompt, 8000, timeout, True)
# Read a few responses and then disconnect
if response.status_code == 200:
itr_count, max_itr = 0, 5
Expand Down Expand Up @@ -268,10 +283,12 @@ def strip_ansi_codes(text: str) -> str:


def verify_request_cancelled(
worker_process: DynamoWorkerProcess,
frontend_process: DynamoFrontendProcess,
worker_log_offset: int = 0,
worker_process: DynamoWorkerProcess,
prefill_worker_process: DynamoWorkerProcess | None = None,
frontend_log_offset: int = 0,
worker_log_offset: int = 0,
prefill_worker_log_offset: int = 0,
) -> tuple[int, int]:
"""Verify that the worker and frontend logs contain cancellation messages

Expand All @@ -294,24 +311,42 @@ def verify_request_cancelled(
if len(parts) > 1:
request_id = parts[-1].strip()
break
if request_id is None:
pytest.fail("Could not find 'New Request ID: <id>' pattern in worker log")

# Check if the same request ID was cancelled
has_worker_cancellation = False
if request_id:
cancellation_pattern = f"Aborted Request ID: {request_id}"
for line in new_worker_content.split("\n"):
# Strip ANSI codes and whitespace for pattern matching
clean_line = strip_ansi_codes(line).strip()
if clean_line.endswith(cancellation_pattern):
has_worker_cancellation = True
break
else:
pytest.fail("Could not find 'New Request ID: <id>' pattern in worker log")
cancellation_pattern = f"Aborted Request ID: {request_id}"
for line in new_worker_content.split("\n"):
# Strip ANSI codes and whitespace for pattern matching
clean_line = strip_ansi_codes(line).strip()
if clean_line.endswith(cancellation_pattern):
has_worker_cancellation = True
break
if not has_worker_cancellation:
pytest.fail(
f"Could not find 'Aborted Request ID: {request_id}' pattern in worker log"
)

# Check if the same request ID was remote prefilled
if prefill_worker_process is not None:
prefill_worker_log_content = read_log_content(prefill_worker_process._log_path)
new_prefill_worker_content = prefill_worker_log_content[
prefill_worker_log_offset:
]

has_remote_prefill = False
remote_prefill_pattern = f"New Prefill Request ID: {request_id}"
for line in new_prefill_worker_content.split("\n"):
clean_line = strip_ansi_codes(line).strip()
if clean_line.endswith(remote_prefill_pattern):
has_remote_prefill = True
break
if not has_remote_prefill:
pytest.fail(
f"Could not find 'New Prefill Request ID: {request_id}' pattern in prefill worker log"
)

# Check frontend log for cancellation issued pattern
frontend_log_content = read_log_content(frontend_process._log_path)
new_frontend_content = frontend_log_content[frontend_log_offset:]
Expand All @@ -327,7 +362,7 @@ def verify_request_cancelled(
if not has_kill_message:
pytest.fail("Could not find cancellation issued in frontend log")

return len(worker_log_content), len(frontend_log_content)
return len(frontend_log_content), len(worker_log_content)


@pytest.mark.vllm
Expand Down Expand Up @@ -364,7 +399,7 @@ def test_request_cancellation_vllm(request, runtime_services):
time.sleep(2)

# Step 3: Test request cancellation
worker_log_offset, frontend_log_offset = 0, 0
frontend_log_offset, worker_log_offset = 0, 0

test_scenarios = [
("completion", "Completion request cancellation"),
Expand All @@ -383,12 +418,89 @@ def test_request_cancellation_vllm(request, runtime_services):
"Checking for cancellation messages in worker and frontend logs..."
)
time.sleep(0.5) # Make sure logs are written before proceeding
worker_log_offset, frontend_log_offset = verify_request_cancelled(
worker, frontend, worker_log_offset, frontend_log_offset
frontend_log_offset, worker_log_offset = verify_request_cancelled(
frontend,
worker,
frontend_log_offset=frontend_log_offset,
worker_log_offset=worker_log_offset,
)

logger.info(f"{description} detected successfully")

logger.info(
"All request cancellation tests completed successfully - request cancellation is working correctly"
)


@pytest.mark.vllm
@pytest.mark.gpu_1
@pytest.mark.e2e
@pytest.mark.slow
def test_request_cancellation_vllm_decode(request, runtime_services):
"""
End-to-end test for request cancellation functionality with remote prefill.

This test verifies that when a request is cancelled by the client,
the system properly handles the cancellation and cleans up resources
on the decode worker side in a disaggregated setup.
"""
# Step 0: Download the model from HuggingFace if not already cached
download_model()

# Step 1: Start the frontend
with DynamoFrontendProcess(request) as frontend:
logger.info("Frontend started successfully")

# Step 2: Start the prefill worker
logger.info("Starting prefill worker...")
prefill_worker = DynamoWorkerProcess(request, is_prefill=True)

with prefill_worker:
logger.info(f"Prefill Worker PID: {prefill_worker.get_pid()}")

# Step 3: Start the decode worker
logger.info("Starting decode worker...")
decode_worker = DynamoWorkerProcess(request, is_prefill=False)

with decode_worker:
logger.info(f"Decode Worker PID: {decode_worker.get_pid()}")

# TODO: Why the model is not immediately available at the frontend after health check
# returns success.
time.sleep(2)

# Step 4: Test request cancellation for completion scenario only
logger.info(
"Testing completion request cancellation in disaggregated mode..."
)
send_request_and_cancel("completion")

logger.info(
"Checking for cancellation messages in decode worker, prefill worker, and frontend logs..."
)
time.sleep(0.5) # Make sure logs are written before proceeding
verify_request_cancelled(frontend, decode_worker, prefill_worker)

logger.info(
"Completion request cancellation detected successfully in disaggregated mode"
)

logger.info(
"Request cancellation test completed successfully in disaggregated mode - request cancellation is working correctly"
)


@pytest.mark.skip(reason="require cancel support before receiving 1st response")
@pytest.mark.vllm
@pytest.mark.gpu_1
@pytest.mark.e2e
@pytest.mark.slow
def test_request_cancellation_vllm_prefill(request, runtime_services):
"""
End-to-end test for request cancellation on remote prefill.

This test verifies that when a request is cancelled by the client during the
prefill phase, the system properly handles the cancellation and cleans up
resources on the prefill worker and decode worker sides in a disaggregated
setup.
"""
Loading