Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 35 additions & 20 deletions lib/runtime/src/transports/event_plane/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ pub struct EventPublisher {
sequence: AtomicU64,
tx: Arc<dyn EventTransportTx>,
codec: Arc<Codec>,
runtime_handle: tokio::runtime::Handle,
/// Discovery client and registered instance for unregistration on drop
discovery_client: Option<Arc<dyn Discovery>>,
discovery_instance: Option<crate::discovery::DiscoveryInstance>,
Expand Down Expand Up @@ -335,6 +336,7 @@ impl EventPublisher {
) -> Result<Self> {
let publisher_id = drt.discovery().instance_id();
let discovery = Some(drt.discovery());
let runtime_handle = drt.runtime().secondary();

// Use Msgpack codec for all transports
enum TransportSetup {
Expand Down Expand Up @@ -464,6 +466,7 @@ impl EventPublisher {
sequence: AtomicU64::new(0),
tx,
codec,
runtime_handle,
discovery_client: discovery,
discovery_instance,
})
Expand Down Expand Up @@ -515,27 +518,39 @@ impl Drop for EventPublisher {
{
let topic = self.topic.clone();
let instance_id = instance.instance_id();

// Spawn background task for async unregister since Drop is sync
tokio::spawn(async move {
match discovery.unregister(instance).await {
Ok(()) => {
tracing::info!(
topic = %topic,
instance_id = %instance_id,
"EventPublisher unregistered from discovery"
);
}
Err(e) => {
tracing::warn!(
topic = %topic,
instance_id = %instance_id,
error = %e,
"Failed to unregister EventPublisher from discovery"
);
let runtime_handle = self.runtime_handle.clone();

// Drop can run outside any Tokio context (notably via PyO3 finalizers), so use
// the runtime that created the publisher rather than the ambient thread state.
let spawn_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
runtime_handle.spawn(async move {
match discovery.unregister(instance).await {
Ok(()) => {
tracing::info!(
topic = %topic,
instance_id = %instance_id,
"EventPublisher unregistered from discovery"
);
}
Err(e) => {
tracing::warn!(
topic = %topic,
instance_id = %instance_id,
error = %e,
"Failed to unregister EventPublisher from discovery"
);
}
}
}
});
});
}));

if spawn_result.is_err() {
tracing::warn!(
topic = %self.topic,
instance_id = %instance_id,
"Skipping EventPublisher unregister during drop because the runtime is unavailable"
);
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ filterwarnings = [
"ignore:The pynvml package is deprecated.*:FutureWarning", # Ignore pynvml deprecation warning, temporary until upstream library updates to nvidia-ml-py
"ignore:The behavior of DataFrame concatenation with empty or all-NA entries is deprecated.*:FutureWarning", # pandas 2.x concat deprecation in AIC SDK TODO: fix in AIC
"ignore:Automatic KV events configuration is deprecated.*:FutureWarning", # Ignore Dynamo's own KV events deprecation warning in tests
"ignore:builtin type (SwigPyPacked|SwigPyObject|swigvarlink) has no __module__ attribute:DeprecationWarning", # Python 3.12 SWIG extension warning from third-party tokenizer deps
# Pydantic V2 deprecation warnings from TRTLLM dependencies (raised at import time during collection)
"ignore:Support for class-based `config`.*:pydantic.warnings.PydanticDeprecatedSince20",
"ignore:Using extra keyword arguments on `Field`.*:pydantic.warnings.PydanticDeprecatedSince20",
Expand Down Expand Up @@ -330,4 +331,3 @@ extra_content_footer = [
<script type="text/javascript">if (typeof _satellite !== "undefined") {_satellite.pageBottom();}</script>
''',
]

9 changes: 1 addition & 8 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,14 +459,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
def stop(self):
"""Stop the NATS server for restart. Does not release port or clean up fully."""
_logger.info(f"Stopping NATS server on port {self.port}")
self._terminate_process_group()
proc = self.proc # type: ignore[has-type]
if proc is not None:
try:
proc.wait(timeout=10)
except Exception as e:
_logger.warning(f"Error waiting for NATS process to stop: {e}")
self.proc = None
self._stop_started_processes()

def start(self):
"""Restart a stopped NATS server with fresh state."""
Expand Down
5 changes: 2 additions & 3 deletions tests/router/test_router_e2e_with_mockers.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
pytest.mark.gpu_0,
pytest.mark.integration,
pytest.mark.model(MODEL_NAME),
pytest.mark.skip(reason="DYN-2365 - Flaky, temporarily disabled"),
]
NUM_MOCKERS = 2
SPEEDUP_RATIO = 10.0
Expand Down Expand Up @@ -820,7 +819,7 @@ def test_kv_router_bindings(
],
indirect=["request_plane", "durable_kv_events"],
)
@pytest.mark.timeout(180)
@pytest.mark.timeout(300)
def test_indexers_sync(
request,
runtime_services_dynamic_ports,
Expand Down Expand Up @@ -927,7 +926,7 @@ def test_query_instance_id_returns_worker_and_tokens(
)


@pytest.mark.timeout(90) # bumped for xdist contention (was 29s; ~9.55s serial avg)
@pytest.mark.timeout(300) # bumped for xdist contention (was 29s; ~9.55s serial avg)
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
@pytest.mark.parametrize(
"durable_kv_events,use_kv_events,zmq_kv_events",
Expand Down
60 changes: 46 additions & 14 deletions tests/utils/managed_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,20 +296,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
except psutil.NoSuchProcess:
pass

self._terminate_process_group()

process_list = [self.proc, self._tee_proc, self._sed_proc]
for process in process_list:
if process:
try:
if process.stdout:
process.stdout.close()
if process.stdin:
process.stdin.close()
terminate_process_tree(process.pid, self._logger)
process.wait()
except Exception as e:
self._logger.warning("Error terminating process: %s", e)
self._stop_started_processes()

# Kill any child processes that survived the process group kill.
# This catches children in different PGIDs (e.g. MPI workers, engine
Expand All @@ -332,6 +319,46 @@ def __exit__(self, exc_type, exc_val, exc_tb):
# Always run straggler cleanup, even if interrupted
self._cleanup_stragglers()

def _stop_started_processes(self, wait_timeout: float = 10.0):
"""Terminate launched subprocesses and close any open pipe handles.

This is used both during normal teardown and when a managed service
needs to stop and restart in-place without releasing higher-level
resources such as ports.
"""
self._terminate_process_group()

process_entries = [
("proc", self.proc),
("_tee_proc", self._tee_proc),
("_sed_proc", self._sed_proc),
]
for attr_name, process in process_entries:
if process is None:
continue

try:
for stream_name in ("stdout", "stdin", "stderr"):
stream = getattr(process, stream_name, None)
if stream is not None:
stream.close()
except Exception as e:
self._logger.warning("Error closing process streams: %s", e)

try:
terminate_process_tree(process.pid, self._logger)
except Exception as e:
self._logger.warning("Error terminating process: %s", e)

try:
process.wait(timeout=wait_timeout)
except Exception as e:
self._logger.warning("Error waiting for process exit: %s", e)
finally:
setattr(self, attr_name, None)

self._pgid = None

def _start_process(self):
assert self._command_name
assert self._log_path
Expand Down Expand Up @@ -434,6 +461,11 @@ def _terminate_process_group(self, timeout: float = 8.0):
poll_interval = 0.1
elapsed = 0.0
while elapsed < timeout:
# Reap the launched child if it has already exited. Without this,
# the child can remain as a zombie and keep killpg(..., 0) reporting
# the process group as alive until the timeout expires.
if self.proc is not None:
self.proc.poll()
try:
# Check if any process in the group is still alive
os.killpg(self._pgid, 0) # Signal 0 = check existence (no kill)
Expand Down
Loading