Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
updated to use functions on trait
  • Loading branch information
nnshah1 committed Jul 21, 2025
commit 9eba58632924410797a1a6ea196743c9a347f778
1 change: 0 additions & 1 deletion lib/bindings/python/rust/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ enum ModelType {
#[pymethods]
impl DistributedRuntime {
#[new]
#[pyo3(signature = (event_loop, is_static))]
fn new(event_loop: PyObject, is_static: bool) -> PyResult<Self> {
let worker = rs::Worker::from_settings().map_err(to_pyerr)?;
INIT.get_or_try_init(|| {
Expand Down
2 changes: 1 addition & 1 deletion lib/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,4 @@ assert_matches = { version = "1.5.0" }
env_logger = { version = "0.11" }
reqwest = { workspace = true }
rstest = { version = "0.23.0" }
temp-env = { version = "0.3.6" }
temp-env = { version = "0.3.6" , features=["async_closure"] }
2 changes: 1 addition & 1 deletion lib/runtime/src/component/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl EndpointConfigBuilder {
let task = tokio::spawn(push_endpoint.start(
service_endpoint,
endpoint.name.clone(),
endpoint.drt().system_health.endpoint_health.clone(),
endpoint.drt().system_health.clone(),
));

// make the components service endpoint discovery in etcd
Expand Down
26 changes: 24 additions & 2 deletions lib/runtime/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl RuntimeConfig {
system_port: DEFAULT_SYSTEM_PORT,
system_enabled: false,
starting_health_status: HealthStatus::NotReady,
use_endpoint_health_status: vec![], // UPDATED DEFAULT TO EMPTY VECTOR
use_endpoint_health_status: vec![],
}
}

Expand All @@ -228,7 +228,7 @@ impl Default for RuntimeConfig {
system_port: DEFAULT_SYSTEM_PORT,
system_enabled: false,
starting_health_status: HealthStatus::NotReady,
use_endpoint_health_status: vec![], // UPDATED DEFAULT TO EMPTY VECTOR
use_endpoint_health_status: vec![],
}
}
}
Expand Down Expand Up @@ -405,6 +405,28 @@ mod tests {
});
}

#[test]
fn test_system_server_starting_health_status_ready() {
temp_env::with_vars(
vec![("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready"))],
|| {
let config = RuntimeConfig::from_settings().unwrap();
assert!(config.starting_health_status == HealthStatus::Ready);
},
);
}

#[test]
fn test_system_use_endpoint_health_status() {
temp_env::with_vars(
vec![("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Some("[\"ready\"]"))],
|| {
let config = RuntimeConfig::from_settings().unwrap();
assert!(config.use_endpoint_health_status == vec!["ready"]);
},
);
}

#[test]
fn test_is_truthy_and_falsey() {
// Test truthy values
Expand Down
6 changes: 4 additions & 2 deletions lib/runtime/src/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ impl DistributedRuntime {
let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
let starting_health_status = config.starting_health_status.clone();
let use_endpoint_health_status = config.use_endpoint_health_status.clone();

let system_health = SystemHealth::new(starting_health_status, use_endpoint_health_status);
let system_health = Arc::new(Mutex::new(SystemHealth::new(
starting_health_status,
use_endpoint_health_status,
)));

let distributed_runtime = Self {
runtime,
Expand Down
119 changes: 86 additions & 33 deletions lib/runtime/src/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,41 +151,15 @@ pub async fn spawn_http_server(
#[tracing::instrument(skip_all, level = "trace")]
async fn health_handler(state: Arc<HttpServerState>) -> impl IntoResponse {
let uptime = state.drt.uptime();
let map = state.drt.system_health.endpoint_health.lock().await;
let mut healthy = true;
let mut status_code = StatusCode::OK;
let mut endpoints: HashMap<String, String> = HashMap::new();

let use_endpoint_health_status = &state.drt.system_health.use_endpoint_health_status;

// List all endpoint statuses
for (endpoint, ready) in &*map {
endpoints.insert(
endpoint.clone(),
if *ready == HealthStatus::Ready {
"ready".to_string()
} else {
"notready".to_string()
},
);
}

// Determine overall health based on use_endpoint_health_status
if !use_endpoint_health_status.is_empty() {
healthy = use_endpoint_health_status.iter().all(|endpoint| {
map.get(endpoint)
.map_or(false, |status| *status == HealthStatus::Ready)
});
} else {
// Fallback to system health if no endpoints specified
healthy = state.drt.system_health.system_health == HealthStatus::Ready;
}
let system_health = state.drt.system_health.lock().await;
let (healthy, endpoints) = system_health.get_health_status();

let healthy_string = if healthy { "ready" } else { "notready" };

if !healthy {
status_code = StatusCode::SERVICE_UNAVAILABLE;
}
let status_code = if healthy {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};

let response = json!({
"status": healthy_string,
Expand Down Expand Up @@ -234,6 +208,7 @@ async fn metrics_handler(state: Arc<HttpServerState>) -> impl IntoResponse {
#[cfg(test)]
mod tests {
use super::*;
use rstest::rstest;
use tokio::time::{sleep, Duration};

#[tokio::test]
Expand Down Expand Up @@ -309,4 +284,82 @@ mod tests {
assert!(response.contains("dynamo_runtime_uptime_seconds"));
assert!(response.contains("Total uptime of the DistributedRuntime in seconds"));
}

#[rstest]
#[case("ready", 200, "ready")]
#[case("notready", 503, "notready")]
#[tokio::test]
async fn test_health_endpoints(
#[case] starting_health_status: &'static str,
#[case] expected_status: u16,
#[case] expected_body: &'static str,
) {
use std::sync::Arc;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
// use tokio::io::{AsyncReadExt, AsyncWriteExt};
// use reqwest for HTTP requests

temp_env::async_with_vars(
[(
"DYN_SYSTEM_STARTING_HEALTH_STATUS",
Some(starting_health_status),
)],
(async || {
let runtime = crate::Runtime::from_settings().unwrap();
let drt = Arc::new(
crate::DistributedRuntime::from_settings_without_discovery(runtime)
.await
.unwrap(),
);
let cancel_token = CancellationToken::new();
let (addr, server_handle) =
spawn_http_server("127.0.0.1", 0, cancel_token.clone(), drt)
.await
.unwrap();
println!("[test] Waiting for server to start...");
sleep(std::time::Duration::from_millis(1000)).await;
println!("[test] Server should be up, starting requests...");
let client = reqwest::Client::new();
for (path, expect_status, expect_body) in [
("/health", expected_status, expected_body),
("/live", expected_status, expected_body),
("/someRandomPathNotFoundHere", 404, "Route not found"),
] {
println!("[test] Sending request to {}", path);
let url = format!("http://{}{}", addr, path);
let response = client.get(&url).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
println!(
"[test] Response for {}: status={}, body={:?}",
path, status, body
);
assert_eq!(
status, expect_status,
"Response: status={}, body={:?}",
status, body
);
assert!(
body.contains(expect_body),
"Response: status={}, body={:?}",
status,
body
);
}
cancel_token.cancel();
match server_handle.await {
Ok(_) => println!("[test] Server shut down normally"),
Err(e) => {
if e.is_panic() {
println!("[test] Server panicked: {:?}", e);
} else {
println!("[test] Server cancelled: {:?}", e);
}
}
}
})(),
)
.await;
}
}
46 changes: 42 additions & 4 deletions lib/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,14 @@ pub struct Runtime {
cancellation_token: CancellationToken,
}

// Current Health Status
/// Current Health Status
/// If use_endpoint_health_status is set then
/// initialize the endpoint_health hashmap to the
/// starting health status
#[derive(Clone)]
pub struct SystemHealth {
system_health: HealthStatus,
endpoint_health: Arc<Mutex<HashMap<String, HealthStatus>>>,
endpoint_health: HashMap<String, HealthStatus>,
use_endpoint_health_status: Vec<String>,
}

Expand All @@ -95,10 +98,45 @@ impl SystemHealth {
}
SystemHealth {
system_health: starting_health_status,
endpoint_health: Arc::new(Mutex::new(endpoint_health)),
endpoint_health,
use_endpoint_health_status,
}
}
pub fn set_health_status(&mut self, status: HealthStatus) {
self.system_health = status;
}

pub fn set_endpoint_health_status(&mut self, endpoint: String, status: HealthStatus) {
self.endpoint_health.insert(endpoint, status);
}

/// Returns the overall health status and endpoint health statuses
pub fn get_health_status(&self) -> (bool, HashMap<String, String>) {
let mut endpoints: HashMap<String, String> = HashMap::new();
for (endpoint, ready) in &self.endpoint_health {
endpoints.insert(
endpoint.clone(),
if *ready == HealthStatus::Ready {
"ready".to_string()
} else {
"notready".to_string()
},
);
}

let healthy;
if !self.use_endpoint_health_status.is_empty() {
healthy = self.use_endpoint_health_status.iter().all(|endpoint| {
self.endpoint_health
.get(endpoint)
.map_or(false, |status| *status == HealthStatus::Ready)
});
} else {
healthy = self.system_health == HealthStatus::Ready;
}

(healthy, endpoints)
}
}

/// Distributed [Runtime] which provides access to shared resources across the cluster, this includes
Expand Down Expand Up @@ -130,5 +168,5 @@ pub struct DistributedRuntime {
start_time: std::time::Instant,

// Health Status
system_health: SystemHealth,
system_health: Arc<Mutex<SystemHealth>>,
}
21 changes: 10 additions & 11 deletions lib/runtime/src/pipeline/network/ingress/push_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
use super::*;
use crate::config::HealthStatus;
use crate::protocols::LeaseId;
use crate::SystemHealth;
use anyhow::Result;
use async_nats::service::endpoint::Endpoint;
use derive_builder::Builder;
Expand All @@ -44,18 +45,17 @@ impl PushEndpoint {
self,
endpoint: Endpoint,
endpoint_name: String,
endpoint_health: Arc<Mutex<HashMap<String, HealthStatus>>>,
system_health: Arc<Mutex<SystemHealth>>,
) -> Result<()> {
let mut endpoint = endpoint;

let inflight = Arc::new(AtomicU64::new(0));
let notify = Arc::new(Notify::new());

{
let mut mut_endpoint_health = endpoint_health.lock().await;

mut_endpoint_health.insert(endpoint_name.clone(), HealthStatus::Ready);
}
system_health
.lock()
.await
.set_endpoint_health_status(endpoint_name.clone(), HealthStatus::Ready);

loop {
let req = tokio::select! {
Expand Down Expand Up @@ -111,11 +111,10 @@ impl PushEndpoint {
}
}

{
let mut mut_endpoint_health = endpoint_health.lock().await;

mut_endpoint_health.insert(endpoint_name.clone(), HealthStatus::NotReady);
}
system_health
.lock()
.await
.set_endpoint_health_status(endpoint_name.clone(), HealthStatus::NotReady);

// await for all inflight requests to complete
tracing::info!(
Expand Down
Loading