Skip to content
Prev Previous commit
Next Next commit
enable test coverage for subprocesses, to include real transport cove…
…rage
  • Loading branch information
shahar4499 committed Apr 17, 2025
commit 295ff6c059a5907ab43c2860a0d49a0aead0025d
15 changes: 14 additions & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
@@ -1,4 +1,17 @@
[run]
omit =
examples/*
tests/*
tests/*
concurrency = multiprocessing
parallel = true
sigterm = true
data_file = .coverage
source = fastapi_mcp
debug = config,dataio,process,multiprocess,dataop,pid,trace

[report]
show_missing = true

[paths]
source =
fastapi_mcp/
2 changes: 1 addition & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[pytest]
addopts = -vvv --cov=. --cov-report xml --cov-report term-missing --cov-fail-under=80
addopts = -vvv --cov=. --cov-report xml --cov-report term-missing --cov-fail-under=80 --cov-config=.coveragerc
asyncio_mode = auto
log_cli = true
log_cli_level = DEBUG
Expand Down
29 changes: 29 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import sys
import os
import pytest
import coverage

# Add the parent directory to the path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
Expand All @@ -8,3 +10,30 @@
from .fixtures.example_data import * # noqa: F403
from .fixtures.simple_app import * # noqa: F403
from .fixtures.complex_app import * # noqa: F403


@pytest.hookimpl(trylast=True)
def pytest_configure(config):
"""Configure pytest-cov for proper subprocess coverage."""
if config.pluginmanager.hasplugin("pytest_cov"):
# Ensure environment variables are set for subprocess coverage
os.environ["COVERAGE_PROCESS_START"] = os.path.abspath(".coveragerc")

# Set up environment for combinining coverage data from subprocesses
os.environ["PYTHONPATH"] = os.path.abspath(".")

# Make sure the pytest-cov plugin is active for subprocesses
config.option.cov_fail_under = 0 # Disable fail under in the primary process


@pytest.hookimpl(trylast=True)
def pytest_sessionfinish(session, exitstatus):
"""Combine coverage data from subprocesses at the end of the test session."""
cov_dir = os.path.abspath(".")
if exitstatus == 0 and os.environ.get("COVERAGE_PROCESS_START"):
try:
cov = coverage.Coverage()
cov.combine(data_paths=[cov_dir], strict=True)
cov.save()
except Exception as e:
print(f"Error combining coverage data: {e}", file=sys.stderr)
65 changes: 61 additions & 4 deletions tests/test_sse_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
import multiprocessing
import socket
import time
import os
import signal
import atexit
import sys
import threading
import coverage
from typing import AsyncGenerator, Generator
from mcp.client.session import ClientSession
from mcp.client.sse import sse_client
Expand Down Expand Up @@ -32,6 +38,40 @@ def server_url(server_port: int) -> str:


def run_server(server_port: int) -> None:
# Initialize coverage for subprocesses
cov = None
if "COVERAGE_PROCESS_START" in os.environ:
cov = coverage.Coverage(source=["fastapi_mcp"])
cov.start()

# Create a function to save coverage data at exit
def cleanup():
if cov:
cov.stop()
cov.save()

# Register multiple cleanup mechanisms to ensure coverage data is saved
atexit.register(cleanup)

# Setup signal handler for clean termination
def handle_signal(signum, frame):
cleanup()
sys.exit(0)

signal.signal(signal.SIGTERM, handle_signal)

# Backup thread to ensure coverage is written if process is terminated abruptly
def periodic_save():
while True:
time.sleep(1.0)
if cov:
cov.save()

save_thread = threading.Thread(target=periodic_save)
save_thread.daemon = True
save_thread.start()

# Configure the server
fastapi = make_simple_fastapi_app()
mcp = FastApiMCP(
fastapi,
Expand All @@ -40,16 +80,26 @@ def run_server(server_port: int) -> None:
)
mcp.mount()

# Start the server
server = uvicorn.Server(config=uvicorn.Config(app=fastapi, host=HOST, port=server_port, log_level="error"))
server.run()

# Give server time to start
while not server.started:
time.sleep(0.5)

# Ensure coverage is saved if exiting the normal way
if cov:
cov.stop()
cov.save()


@pytest.fixture()
def server(server_port: int) -> Generator[None, None, None]:
# Ensure COVERAGE_PROCESS_START is set in the environment for subprocesses
coverage_rc = os.path.abspath(".coveragerc")
os.environ["COVERAGE_PROCESS_START"] = coverage_rc

proc = multiprocessing.Process(target=run_server, kwargs={"server_port": server_port}, daemon=True)
proc.start()

Expand All @@ -69,11 +119,18 @@ def server(server_port: int) -> Generator[None, None, None]:

yield

# Signal the server to stop
proc.kill()
proc.join(timeout=2)
# Signal the server to stop - added graceful shutdown before kill
try:
proc.terminate()
proc.join(timeout=2)
except (OSError, AttributeError):
pass

if proc.is_alive():
raise RuntimeError("server process failed to terminate")
proc.kill()
proc.join(timeout=2)
if proc.is_alive():
raise RuntimeError("server process failed to terminate")


@pytest.fixture()
Expand Down