Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion lib/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,4 @@ assert_matches = { version = "1.5.0" }
env_logger = { version = "0.11" }
reqwest = { workspace = true }
rstest = { version = "0.23.0" }
temp-env = { version = "0.3.6" }
temp-env = { version = "0.3.6" , features=["async_closure"] }
4 changes: 3 additions & 1 deletion lib/runtime/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
//!
//! TODO: Top-level Overview of Endpoints/Functions

use crate::{discovery::Lease, service::ServiceSet, transports::etcd::EtcdPath};
use crate::{
config::HealthStatus, discovery::Lease, service::ServiceSet, transports::etcd::EtcdPath,
};

use super::{
error,
Expand Down
6 changes: 5 additions & 1 deletion lib/runtime/src/component/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ impl EndpointConfigBuilder {
.map_err(|e| anyhow::anyhow!("Failed to build push endpoint: {e}"))?;

// launch in primary runtime
let task = tokio::spawn(push_endpoint.start(service_endpoint));
let task = tokio::spawn(push_endpoint.start(
service_endpoint,
endpoint.name.clone(),
endpoint.drt().system_health.clone(),
));

// make the components service endpoint discovery in etcd

Expand Down
62 changes: 61 additions & 1 deletion lib/runtime/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ impl Default for WorkerConfig {
}
}

#[derive(Debug, Deserialize, Serialize, PartialEq, Clone)]
#[serde(rename_all = "lowercase")]
pub enum HealthStatus {
Ready,
NotReady,
}

/// Runtime configuration
/// Defines the configuration for Tokio runtimes
#[derive(Serialize, Deserialize, Validate, Debug, Builder, Clone)]
Expand Down Expand Up @@ -88,6 +95,21 @@ pub struct RuntimeConfig {
#[builder(default = "false")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub system_enabled: bool,

/// Starting Health Status
/// Set this at runtime with environment variable DYN_SYSTEM_STARTING_HEALTH_STATUS
#[builder(default = "HealthStatus::NotReady")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub starting_health_status: HealthStatus,

/// Use Endpoint Health Status
/// When using endpoint health status, health status
/// is the AND of individual endpoint health
/// Set this at runtime with environment variable DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS
/// with the list of endpoints to consider for system health
#[builder(default = "vec![]")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub use_endpoint_health_status: Vec<String>,
}

impl fmt::Display for RuntimeConfig {
Expand All @@ -102,6 +124,16 @@ impl fmt::Display for RuntimeConfig {
write!(f, "system_host={}, ", self.system_host)?;
write!(f, "system_port={}, ", self.system_port)?;
write!(f, "system_enabled={}", self.system_enabled)?;
write!(
f,
"use_endpoint_health_status={:?}",
self.use_endpoint_health_status
)?;
write!(
f,
"starting_health_status={:?}",
self.starting_health_status
)?;

Ok(())
}
Expand Down Expand Up @@ -135,6 +167,8 @@ impl RuntimeConfig {
"HOST" => "system_host",
"PORT" => "system_port",
"ENABLED" => "system_enabled",
"USE_ENDPOINT_HEALTH_STATUS" => "use_endpoint_health_status",
"STARTING_HEALTH_STATUS" => "starting_health_status",
_ => k.as_str(),
};
Some(mapped_key.into())
Expand All @@ -151,7 +185,7 @@ impl RuntimeConfig {
/// 2. /opt/dynamo/etc/runtime.toml
/// 3. /opt/dynamo/defaults/runtime.toml (lowest priority)
///
/// Environment variables are prefixed with `DYN_RUNTIME_`
/// Environment variables are prefixed with `DYN_RUNTIME_` and `DYN_SYSTEM`
pub fn from_settings() -> Result<RuntimeConfig> {
let config: RuntimeConfig = Self::figment().extract()?;
config.validate()?;
Expand All @@ -171,6 +205,8 @@ impl RuntimeConfig {
system_host: DEFAULT_SYSTEM_HOST.to_string(),
system_port: DEFAULT_SYSTEM_PORT,
system_enabled: false,
starting_health_status: HealthStatus::NotReady,
use_endpoint_health_status: vec![],
}
}

Expand All @@ -196,6 +232,8 @@ impl Default for RuntimeConfig {
system_host: DEFAULT_SYSTEM_HOST.to_string(),
system_port: DEFAULT_SYSTEM_PORT,
system_enabled: false,
starting_health_status: HealthStatus::NotReady,
use_endpoint_health_status: vec![],
}
}
}
Expand Down Expand Up @@ -372,6 +410,28 @@ mod tests {
});
}

#[test]
fn test_system_server_starting_health_status_ready() {
temp_env::with_vars(
vec![("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready"))],
|| {
let config = RuntimeConfig::from_settings().unwrap();
assert!(config.starting_health_status == HealthStatus::Ready);
},
);
}

#[test]
fn test_system_use_endpoint_health_status() {
temp_env::with_vars(
vec![("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Some("[\"ready\"]"))],
|| {
let config = RuntimeConfig::from_settings().unwrap();
assert!(config.use_endpoint_health_status == vec!["ready"]);
},
);
}

#[test]
fn test_is_truthy_and_falsey() {
// Test truthy values
Expand Down
12 changes: 10 additions & 2 deletions lib/runtime/src/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
ErrorContext,
};

use super::{error, Arc, DistributedRuntime, OnceCell, Result, Runtime, Weak, OK};
use super::{error, Arc, DistributedRuntime, OnceCell, Result, Runtime, SystemHealth, Weak, OK};

use derive_getters::Dissolve;
use figment::error;
Expand Down Expand Up @@ -65,6 +65,14 @@ impl DistributedRuntime {
})
.await??;

let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
let starting_health_status = config.starting_health_status.clone();
let use_endpoint_health_status = config.use_endpoint_health_status.clone();
let system_health = Arc::new(Mutex::new(SystemHealth::new(
starting_health_status,
use_endpoint_health_status,
)));

let distributed_runtime = Self {
runtime,
etcd_client,
Expand All @@ -74,10 +82,10 @@ impl DistributedRuntime {
is_static,
instance_sources: Arc::new(Mutex::new(HashMap::new())),
start_time: std::time::Instant::now(),
system_health,
};

// Start HTTP server for health and metrics (if enabled)
let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
if config.system_server_enabled() {
let drt_arc = Arc::new(distributed_runtime.clone());
let runtime_clone = distributed_runtime.runtime.clone();
Expand Down
140 changes: 84 additions & 56 deletions lib/runtime/src/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::config::HealthStatus;
use axum::{body, http::StatusCode, response::IntoResponse, routing::get, Router};
use prometheus::{
proto, register_gauge_with_registry, Encoder, Gauge, Opts, Registry, TextEncoder,
};
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -145,11 +148,28 @@ pub async fn spawn_http_server(
}

/// Health handler
#[tracing::instrument(skip_all, level = "trace")]
async fn health_handler(state: Arc<HttpServerState>) -> impl IntoResponse {
tracing::info!("[health_handler] called");
let uptime = state.drt.uptime();
let response = format!("OK\nUptime: {} seconds\n", uptime.as_secs());
(StatusCode::OK, response)
let system_health = state.drt.system_health.lock().await;
let (healthy, endpoints) = system_health.get_health_status();

let healthy_string = if healthy { "ready" } else { "notready" };
let status_code = if healthy {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};

let response = json!({
"status": healthy_string,
"uptime": uptime.as_secs(),
"endpoints": endpoints
});

tracing::trace!("Response {}", response.to_string());

(status_code, response.to_string())
}

/// Metrics handler with DistributedRuntime uptime
Expand Down Expand Up @@ -188,6 +208,7 @@ async fn metrics_handler(state: Arc<HttpServerState>) -> impl IntoResponse {
#[cfg(test)]
mod tests {
use super::*;
use rstest::rstest;
use tokio::time::{sleep, Duration};

#[tokio::test]
Expand Down Expand Up @@ -264,65 +285,72 @@ mod tests {
assert!(response.contains("Total uptime of the DistributedRuntime in seconds"));
}

/*
#[rstest]
#[case("ready", 200, "ready")]
#[case("notready", 503, "notready")]
#[tokio::test]
async fn test_spawn_http_server_endpoints() {
async fn test_health_endpoints(
#[case] starting_health_status: &'static str,
#[case] expected_status: u16,
#[case] expected_body: &'static str,
) {
use std::sync::Arc;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
// use tokio::io::{AsyncReadExt, AsyncWriteExt};
// use reqwest for HTTP requests
let runtime = crate::Runtime::from_settings().unwrap();
let drt = Arc::new(
crate::DistributedRuntime::from_settings_without_discovery(runtime)
.await
.unwrap(),
);
let cancel_token = CancellationToken::new();
let (addr, server_handle) = spawn_http_server("127.0.0.1", 0, cancel_token.clone(), drt)
.await
.unwrap();
println!("[test] Waiting for server to start...");
sleep(std::time::Duration::from_millis(1000)).await;
println!("[test] Server should be up, starting requests...");
let client = reqwest::Client::new();
for (path, expect_200, expect_body) in [
("/health", true, "OK"),
("/live", true, "OK"),
("/someRandomPathNotFoundHere", false, "Route not found"),
] {
println!("[test] Sending request to {}", path);
let url = format!("http://{}{}", addr, path);
let response = client.get(&url).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
println!(
"[test] Response for {}: status={}, body={:?}",
path, status, body
);
if expect_200 {
assert_eq!(status, 200, "Response: status={}, body={:?}", status, body);
} else {
assert_eq!(status, 404, "Response: status={}, body={:?}", status, body);
}
assert!(
body.contains(expect_body),
"Response: status={}, body={:?}",
status,
body
);
}
cancel_token.cancel();
match server_handle.await {
Ok(_) => println!("[test] Server shut down normally"),
Err(e) => {
if e.is_panic() {
println!("[test] Server panicked: {:?}", e);
} else {
println!("[test] Server cancelled: {:?}", e);

// Closure call is needed here to satisfy async_with_vars

#[allow(clippy::redundant_closure_call)]
temp_env::async_with_vars(
[(
"DYN_SYSTEM_STARTING_HEALTH_STATUS",
Some(starting_health_status),
)],
(async || {
let runtime = crate::Runtime::from_settings().unwrap();
let drt = Arc::new(
crate::DistributedRuntime::from_settings_without_discovery(runtime)
.await
.unwrap(),
);
let cancel_token = CancellationToken::new();
let (addr, _) = spawn_http_server("127.0.0.1", 0, cancel_token.clone(), drt)
.await
.unwrap();
println!("[test] Waiting for server to start...");
sleep(std::time::Duration::from_millis(1000)).await;
println!("[test] Server should be up, starting requests...");
let client = reqwest::Client::new();
for (path, expect_status, expect_body) in [
("/health", expected_status, expected_body),
("/live", expected_status, expected_body),
("/someRandomPathNotFoundHere", 404, "Route not found"),
] {
println!("[test] Sending request to {}", path);
let url = format!("http://{}{}", addr, path);
let response = client.get(&url).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
println!(
"[test] Response for {}: status={}, body={:?}",
path, status, body
);
assert_eq!(
status, expect_status,
"Response: status={}, body={:?}",
status, body
);
assert!(
body.contains(expect_body),
"Response: status={}, body={:?}",
status,
body
);
}
}
}
})(),
)
.await;
}
*/
}
Loading
Loading