Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
initial commit of health check changes
  • Loading branch information
nnshah1 committed Jul 17, 2025
commit 999cf5d900e34c2270a1c7051634a3f5efff8244
5 changes: 3 additions & 2 deletions lib/bindings/python/rust/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ enum ModelType {
#[pymethods]
impl DistributedRuntime {
#[new]
fn new(event_loop: PyObject, is_static: bool) -> PyResult<Self> {
#[pyo3(signature = (event_loop, is_static, wait_for_served=None))]
fn new(event_loop: PyObject, is_static: bool, wait_for_served: Option<Vec<String>>) -> PyResult<Self> {
let worker = rs::Worker::from_settings().map_err(to_pyerr)?;
INIT.get_or_try_init(|| {
let primary = worker.tokio_runtime()?;
Expand All @@ -258,7 +259,7 @@ impl DistributedRuntime {
} else {
runtime
.secondary()
.block_on(rs::DistributedRuntime::from_settings(runtime))
.block_on(rs::DistributedRuntime::from_settings(runtime, wait_for_served))
};
let inner = inner.map_err(to_pyerr)?;

Expand Down
4 changes: 2 additions & 2 deletions lib/bindings/python/src/dynamo/runtime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
from dynamo._core import OAIChatPreprocessor as OAIChatPreprocessor


def dynamo_worker(static=False):
def dynamo_worker(static=False, wait_for_served=None):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop, static)
runtime = DistributedRuntime(loop, static, wait_for_served)

await func(runtime, *args, **kwargs)

Expand Down
3 changes: 2 additions & 1 deletion lib/llm/src/entrypoint/input/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ pub async fn prepare_engine(
) -> anyhow::Result<PreparedEngine> {
match engine_config {
EngineConfig::Dynamic(local_model) => {
let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
let distributed_runtime =
DistributedRuntime::from_settings(runtime.clone(), None).await?;

let Some(etcd_client) = distributed_runtime.etcd_client() else {
anyhow::bail!("Cannot be both static mode and run with dynamic discovery.");
Expand Down
3 changes: 2 additions & 1 deletion lib/llm/src/entrypoint/input/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul
.build()?;
match engine_config {
EngineConfig::Dynamic(_) => {
let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
let distributed_runtime =
DistributedRuntime::from_settings(runtime.clone(), None).await?;
match distributed_runtime.etcd_client() {
Some(etcd_client) => {
let router_config = engine_config.local_model().router_config();
Expand Down
2 changes: 1 addition & 1 deletion lib/runtime/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl RuntimeConfig {
/// 2. /opt/dynamo/etc/runtime.toml
/// 3. /opt/dynamo/defaults/runtime.toml (lowest priority)
///
/// Environment variables are prefixed with `DYN_RUNTIME_`
/// Environment variables are prefixed with `DYN_RUNTIME_` and `DYN_SYSTEM`
pub fn from_settings() -> Result<RuntimeConfig> {
let config: RuntimeConfig = Self::figment().extract()?;
config.validate()?;
Expand Down
28 changes: 23 additions & 5 deletions lib/runtime/src/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use tokio_util::sync::CancellationToken;
impl DistributedRuntime {
pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
let secondary = runtime.secondary();
let (etcd_config, nats_config, is_static) = config.dissolve();
let (etcd_config, nats_config, is_static, wait_for_served) = config.dissolve();

let runtime_clone = runtime.clone();

Expand Down Expand Up @@ -65,6 +65,14 @@ impl DistributedRuntime {
})
.await??;

let served_endpoints_hashmap: HashMap<String, bool> = wait_for_served
.unwrap_or_default()
.into_iter()
.map(|k| (k, false))
.collect();

let served_endpoints = Arc::new(Mutex::new(served_endpoints_hashmap));

let distributed_runtime = Self {
runtime,
etcd_client,
Expand All @@ -74,6 +82,7 @@ impl DistributedRuntime {
is_static,
instance_sources: Arc::new(Mutex::new(HashMap::new())),
start_time: std::time::Instant::now(),
served_endpoints,
};

// Start HTTP server for health and metrics (if enabled)
Expand Down Expand Up @@ -104,14 +113,17 @@ impl DistributedRuntime {
Ok(distributed_runtime)
}

pub async fn from_settings(runtime: Runtime) -> Result<Self> {
let config = DistributedConfig::from_settings(false);
pub async fn from_settings(
runtime: Runtime,
wait_for_served: Option<Vec<String>>,
) -> Result<Self> {
let config = DistributedConfig::from_settings(false, wait_for_served);
Self::new(runtime, config).await
}

// Call this if you are using static workers that do not need etcd-based discovery.
pub async fn from_settings_without_discovery(runtime: Runtime) -> Result<Self> {
let config = DistributedConfig::from_settings(true);
let config = DistributedConfig::from_settings(true, None);
Self::new(runtime, config).await
}

Expand Down Expand Up @@ -203,14 +215,19 @@ pub struct DistributedConfig {
pub etcd_config: etcd::ClientOptions,
pub nats_config: nats::ClientOptions,
pub is_static: bool,
pub wait_for_served: Option<Vec<String>>,
}

impl DistributedConfig {
pub fn from_settings(is_static: bool) -> DistributedConfig {
pub fn from_settings(
is_static: bool,
wait_for_served: Option<Vec<String>>,
) -> DistributedConfig {
DistributedConfig {
etcd_config: etcd::ClientOptions::default(),
nats_config: nats::ClientOptions::default(),
is_static,
wait_for_served,
}
}

Expand All @@ -219,6 +236,7 @@ impl DistributedConfig {
etcd_config: etcd::ClientOptions::default(),
nats_config: nats::ClientOptions::default(),
is_static: false,
wait_for_served: None,
};

config.etcd_config.attach_lease = false;
Expand Down
43 changes: 38 additions & 5 deletions lib/runtime/src/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use axum::{body, http::StatusCode, response::IntoResponse, routing::get, Router}
use prometheus::{
proto, register_gauge_with_registry, Encoder, Gauge, Opts, Registry, TextEncoder,
};
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -145,11 +147,44 @@ pub async fn spawn_http_server(
}

/// Health handler
#[tracing::instrument(skip_all, level = "trace")]
async fn health_handler(state: Arc<HttpServerState>) -> impl IntoResponse {
tracing::info!("[health_handler] called");
let uptime = state.drt.uptime();
let response = format!("OK\nUptime: {} seconds\n", uptime.as_secs());
(StatusCode::OK, response)
let map = state.drt.served_endpoints.lock().await;
let mut healthy = true;
let mut status_code = StatusCode::OK;
let mut endpoints: HashMap<String, String> = HashMap::<String, String>::new();

// Check Status for all Endpoints

for (endpoint, ready) in map.iter() {
healthy = healthy && *ready;
endpoints.insert(
endpoint.to_string(),
if *ready {
"ready".to_string()
} else {
"not_ready".to_string()
},
);
}

let mut healthy_string = "ready";

if !healthy {
healthy_string = "not_ready";
status_code = StatusCode::SERVICE_UNAVAILABLE;
}

let response = json!({
"status": healthy_string,
"uptime": uptime.as_secs(),
"endpoints":endpoints
});

tracing::trace!("Response {}", response.to_string());

(status_code, response.to_string())
}

/// Metrics handler with DistributedRuntime uptime
Expand Down Expand Up @@ -264,7 +299,6 @@ mod tests {
assert!(response.contains("Total uptime of the DistributedRuntime in seconds"));
}

/*
#[tokio::test]
async fn test_spawn_http_server_endpoints() {
use std::sync::Arc;
Expand Down Expand Up @@ -324,5 +358,4 @@ mod tests {
}
}
}
*/
}
4 changes: 4 additions & 0 deletions lib/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,8 @@ pub struct DistributedRuntime {

// Start time for tracking uptime
start_time: std::time::Instant,

// List of endpoints to wait for before being considered ready
// via system health. If system endpoints are not enabled has no effect.
served_endpoints: Arc<Mutex<HashMap<String, bool>>>,
}
Loading