Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
incremental
  • Loading branch information
nnshah1 committed Jul 18, 2025
commit 022955b43fb73c942f6c903c8cf9475b1be7ae52
108 changes: 98 additions & 10 deletions lib/runtime/src/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,15 +300,15 @@ mod tests {
}

#[tokio::test]
async fn test_spawn_http_server_endpoints() {
async fn test_not_ready() {
use std::sync::Arc;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
// use tokio::io::{AsyncReadExt, AsyncWriteExt};
// use reqwest for HTTP requests
let runtime = crate::Runtime::from_settings().unwrap();
let drt = Arc::new(
crate::DistributedRuntime::from_settings_without_discovery(runtime)
crate::DistributedRuntime::from_settings(runtime,vec!["never_start".to_string()].into())
.await
.unwrap(),
);
Expand All @@ -320,10 +320,9 @@ mod tests {
sleep(std::time::Duration::from_millis(1000)).await;
println!("[test] Server should be up, starting requests...");
let client = reqwest::Client::new();
for (path, expect_200, expect_body) in [
("/health", true, "ready"),
("/live", true, "ready"),
("/someRandomPathNotFoundHere", false, "Route not found"),
for (path, expect_status, expect_body) in [
("/health", 503, "not_ready"),
("/live", 503, "not_ready"),
] {
println!("[test] Sending request to {}", path);
let url = format!("http://{}{}", addr, path);
Expand All @@ -334,18 +333,106 @@ mod tests {
"[test] Response for {}: status={}, body={:?}",
path, status, body
);
if expect_200 {
assert_eq!(status, 200, "Response: status={}, body={:?}", status, body);
} else {
assert_eq!(status, 404, "Response: status={}, body={:?}", status, body);

assert_eq!(status, expect_status, "Response: status={}, body={:?}", status, body);

assert!(
body.contains(expect_body),
"Response: status={}, body={:?}",
status,
body
);
}
cancel_token.cancel();
match server_handle.await {
Ok(_) => println!("[test] Server shut down normally"),
Err(e) => {
if e.is_panic() {
println!("[test] Server panicked: {:?}", e);
} else {
println!("[test] Server cancelled: {:?}", e);
}
}
}
}

struct DelayedHandler {}

impl DelayedHandler {
fn new() -> Arc<Self> {
Arc::new(Self {})
}
}


#[async_trait]
impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for DelayedHandler {
async fn generate(&self, input: SingleIn<String>) -> Result<ManyOut<Annotated<String>>> {
"delayed handler"
}
}

#[tokio::test]
async fn test_delayed_ready() {
use std::sync::Arc;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
// use tokio::io::{AsyncReadExt, AsyncWriteExt};
// use reqwest for HTTP requests
let runtime = crate::Runtime::from_settings().unwrap();
let drt = Arc::new(
crate::DistributedRuntime::from_settings(runtime,vec!["delay_start".to_string()].into())
.await
.unwrap(),
);
let cancel_token = CancellationToken::new();
let (addr, server_handle) = spawn_http_server("127.0.0.1", 0, cancel_token.clone(), drt)
.await
.unwrap();
println!("[test] Waiting for server to start...");
sleep(std::time::Duration::from_millis(1000)).await;
println!("[test] Server should be up, starting requests...");
let client = reqwest::Client::new();
for (path, expect_status, expect_body) in [
("/health", 503, "not_ready"),
("/live", 503, "not_ready"),
] {
println!("[test] Sending request to {}", path);
let url = format!("http://{}{}", addr, path);
let response = client.get(&url).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
println!(
"[test] Response for {}: status={}, body={:?}",
path, status, body
);

assert_eq!(status, expect_status, "Response: status={}, body={:?}", status, body);

assert!(
body.contains(expect_body),
"Response: status={}, body={:?}",
status,
body
);
}

let ingress = Ingress::for_engine(DelayedHandler::new())?;


runtime.namespace(DEFAULT_NAMESPACE)?
.component("delayed")
.service_builder()
.create()
.await?
.endpoint("delay_start")
.endpoint_builder()
.handler(ingress)
.start()
.await



cancel_token.cancel();
match server_handle.await {
Ok(_) => println!("[test] Server shut down normally"),
Expand All @@ -359,3 +446,4 @@ mod tests {
}
}
}