Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion crates/test-programs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ os_pipe = "0.9"
anyhow = { workspace = true }
wat = { workspace = true }
cap-std = { workspace = true }
tokio = { version = "1.8.0", features = ["rt-multi-thread"] }
tokio = { version = "1.8.0", features = ["net", "rt-multi-thread"] }
wasmtime-wasi-http = { workspace = true }
hyper = { version = "1.0.0-rc.3", features = ["full"] }
http = { version = "0.2.9" }
http-body = "1.0.0-rc.2"
http-body-util = "0.1.0-rc.2"

[features]
test_programs = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,59 @@ use wasmtime::{Config, Engine, Linker, Module, Store};
use wasmtime_wasi::{sync::WasiCtxBuilder, WasiCtx};
use wasmtime_wasi_http::WasiHttp;

use http_body_util::combinators::BoxBody;
use http_body_util::BodyExt;
use hyper::server::conn::http1;
use hyper::{body::Bytes, service::service_fn, Request, Response};
use std::{error::Error, net::SocketAddr};
use tokio::net::TcpListener;

async fn test(
req: Request<hyper::body::Incoming>,
) -> http::Result<Response<BoxBody<Bytes, hyper::Error>>> {
let method = req.method().to_string();
Response::builder()
.status(http::StatusCode::OK)
.header("x-wasmtime-test-method", method)
.body(req.into_body().boxed())
}

async fn async_run_serve() -> Result<(), Box<dyn Error + Send + Sync>> {
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));

let listener = TcpListener::bind(addr).await?;

loop {
let (stream, _) = listener.accept().await?;

tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new()
.serve_connection(stream, service_fn(test))
.await
{
println!("Error serving connection: {:?}", err);
}
});
}
}

fn run_server() -> Result<(), Box<dyn Error + Send + Sync>> {
let rt = tokio::runtime::Runtime::new()?;
let _ent = rt.enter();

rt.block_on(async_run_serve())?;
Ok(())
}

pub fn instantiate_inherit_stdio(
data: &[u8],
bin_name: &str,
_workspace: Option<&Path>,
) -> anyhow::Result<()> {
let _thread = std::thread::spawn(|| {
run_server().unwrap();
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to break as soon as we run more than one test in a process, which I hope we will do soon. I'll work on the structural changes to the test suite to work that out soon, but its ok for now


let config = Config::new();
let engine = Engine::new(&config)?;
let module = Module::new(&engine, &data).context("failed to create wasm module")?;
Expand Down
47 changes: 35 additions & 12 deletions crates/test-programs/wasi-http-tests/src/bin/outbound_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ impl fmt::Debug for Response {
}
}

impl Response {
fn header(&self, name: &str) -> Option<&String> {
self.headers.iter().find_map(
|(k, v)| if k == name { Some(v) } else { None }
)
}
}

fn request(
method: types::MethodParam<'_>,
scheme: types::SchemeParam<'_>,
Expand All @@ -47,9 +55,13 @@ fn request(
body_cursor += written as usize;
}

let future_response = default_outgoing_http::handle(request, None);

// TODO: The current implementation requires this drop after the request is sent.
// The ownership semantics are unclear in wasi-http we should clarify exactly what is
// supposed to happen here.
streams::drop_output_stream(request_body);

let future_response = default_outgoing_http::handle(request, None);
// TODO: we could create a pollable from the future_response and poll on it here to test that
// its available immediately

Expand All @@ -71,7 +83,6 @@ fn request(

let body_stream = types::incoming_response_consume(incoming_response)
.map_err(|()| anyhow!("incoming response has no body stream"))?;
types::drop_incoming_response(incoming_response);

let mut body = Vec::new();
let mut eof = false;
Expand All @@ -81,6 +92,7 @@ fn request(
body.append(&mut body_chunk);
}
streams::drop_input_stream(body_stream);
types::drop_incoming_response(incoming_response);

Ok(Response {
status,
Expand All @@ -89,44 +101,55 @@ fn request(
})
}

fn main() -> Result<()> {
fn main() -> Result<()> {
let missing = "MISSING".to_string();
let r1 = request(
types::MethodParam::Get,
types::SchemeParam::Http,
"postman-echo.com",
"localhost:3000",
"/get",
"?some=arg?goes=here",
&[],
)
.context("postman-echo /get")?;
.context("localhost:3000 /get")?;

println!("postman-echo /get: {r1:?}");
println!("localhost:3000 /get: {r1:?}");
assert_eq!(r1.status, 200);
let method = r1.header("x-wasmtime-test-method").unwrap_or(&missing);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests are the one place in Rust I prefer to write code that panics on failure - after all, that is what assert_eq! is doing. So in this case I would write this as let method = r1.header("x-wasmtime-test-method").unwrap(); rather than mess around with having the alternative value around with the correct type, which is clunky.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'm still learning idiomatic rust. I will correct this in a follow-on PR.

assert_eq!(method, "GET");
assert_eq!(r1.body, b"");

let r2 = request(
types::MethodParam::Post,
types::SchemeParam::Http,
"postman-echo.com",
"localhost:3000",
"/post",
"",
b"{\"foo\": \"bar\"}",
)
.context("postman-echo /post")?;
.context("localhost:3000 /post")?;

println!("postman-echo /post: {r2:?}");
println!("localhost:3000 /post: {r2:?}");
assert_eq!(r2.status, 200);
let method = r2.header("x-wasmtime-test-method").unwrap_or(&missing);
assert_eq!(method, "POST");
assert_eq!(r2.body, b"{\"foo\": \"bar\"}");

let r3 = request(
types::MethodParam::Put,
types::SchemeParam::Http,
"postman-echo.com",
"localhost:3000",
"/put",
"",
&[],
)
.context("postman-echo /put")?;
.context("localhost:3000 /put")?;

println!("postman-echo /put: {r3:?}");
println!("localhost:3000 /put: {r3:?}");
assert_eq!(r3.status, 200);
let method = r3.header("x-wasmtime-test-method").unwrap_or(&missing);
assert_eq!(method, "PUT");
assert_eq!(r3.body, b"");

Ok(())
}
18 changes: 18 additions & 0 deletions crates/wasi-http/src/http_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use bytes::{BufMut, Bytes, BytesMut};
use http_body_util::{BodyExt, Full};
use hyper::Method;
use hyper::Request;
use std::collections::HashMap;
#[cfg(not(any(target_arch = "riscv64", target_arch = "s390x")))]
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -201,6 +202,23 @@ impl WasiHttp {
if let Some(chunk) = frame.data_ref() {
buf.put(chunk.clone());
}
if let Some(trailers) = frame.trailers_ref() {
response.trailers = self.fields_id_base;
self.fields_id_base += 1;
let mut map: HashMap<String, Vec<String>> = HashMap::new();
for (name, value) in trailers.iter() {
let key = name.to_string();
match map.get_mut(&key) {
Some(vec) => vec.push(value.to_str()?.to_string()),
None => {
let mut vec = Vec::new();
vec.push(value.to_str()?.to_string());
map.insert(key, vec);
}
};
}
self.fields.insert(response.trailers, map);
}
}
response.body = self.streams_id_base;
self.streams_id_base = self.streams_id_base + 1;
Expand Down
56 changes: 40 additions & 16 deletions crates/wasi-http/src/streams_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::poll::Pollable;
use crate::streams::{InputStream, OutputStream, StreamError};
use crate::WasiHttp;
use anyhow::{anyhow, bail};
use bytes::BufMut;
use std::vec::Vec;

impl crate::streams::Host for WasiHttp {
Expand All @@ -27,22 +28,31 @@ impl crate::streams::Host for WasiHttp {

fn skip(
&mut self,
_this: InputStream,
_len: u64,
stream: InputStream,
len: u64,
) -> wasmtime::Result<Result<(u64, bool), StreamError>> {
bail!("unimplemented: skip");
let s = self
.streams
.get_mut(&stream)
.ok_or_else(|| anyhow!("stream not found: {stream}"))?;
if len == 0 {
Ok(Ok((0, s.len() > 0)))
} else if s.len() > len.try_into()? {
s.truncate(len.try_into()?);
Ok(Ok((len, false)))
} else {
let bytes = s.len();
s.truncate(s.len());
Ok(Ok((bytes.try_into()?, true)))
}
}

fn subscribe_to_input_stream(&mut self, _this: InputStream) -> wasmtime::Result<Pollable> {
bail!("unimplemented: subscribe_to_input_stream");
}

fn drop_input_stream(&mut self, stream: InputStream) -> wasmtime::Result<()> {
let r = self
.streams
.get_mut(&stream)
.ok_or_else(|| anyhow!("no such input-stream {stream}"))?;
r.truncate(0);
self.streams.remove(&stream);
Ok(())
}

Expand All @@ -51,17 +61,32 @@ impl crate::streams::Host for WasiHttp {
this: OutputStream,
buf: Vec<u8>,
) -> wasmtime::Result<Result<u64, StreamError>> {
// TODO: Make this a real write not a replace.
self.streams.insert(this, bytes::Bytes::from(buf.clone()));
match self.streams.get(&this) {
Some(data) => {
let mut new = bytes::BytesMut::with_capacity(data.len() + buf.len());
new.put(data.clone());
new.put(bytes::Bytes::from(buf.clone()));
self.streams.insert(this, new.freeze());
}
None => {
self.streams.insert(this, bytes::Bytes::from(buf.clone()));
}
}
Ok(Ok(buf.len().try_into()?))
}

fn write_zeroes(
&mut self,
_this: OutputStream,
_len: u64,
this: OutputStream,
len: u64,
) -> wasmtime::Result<Result<u64, StreamError>> {
bail!("unimplemented: write_zeroes");
let mut data = Vec::with_capacity(len.try_into()?);
let mut i = 0;
while i < len {
data.push(0);
i = i + 1;
}
self.write(this, data)
}

fn splice(
Expand All @@ -85,9 +110,8 @@ impl crate::streams::Host for WasiHttp {
bail!("unimplemented: subscribe_to_output_stream");
}

fn drop_output_stream(&mut self, _this: OutputStream) -> wasmtime::Result<()> {
//bail!("unimplemented: drop_output_stream");
//FIXME: intentionally ignoring
fn drop_output_stream(&mut self, stream: OutputStream) -> wasmtime::Result<()> {
self.streams.remove(&stream);
Ok(())
}
}
2 changes: 2 additions & 0 deletions crates/wasi-http/src/struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub struct ActiveResponse {
pub status: u16,
pub body: u32,
pub response_headers: HashMap<String, Vec<String>>,
pub trailers: u32,
}

impl ActiveRequest {
Expand All @@ -60,6 +61,7 @@ impl ActiveResponse {
status: 0,
body: 0,
response_headers: HashMap::new(),
trailers: 0,
}
}
}
Expand Down
18 changes: 14 additions & 4 deletions crates/wasi-http/src/types_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,16 @@ impl crate::types::Host for WasiHttp {
self.fields.insert(id, m.clone());
Ok(id)
}
fn finish_incoming_stream(&mut self, _s: IncomingStream) -> wasmtime::Result<Option<Trailers>> {
bail!("unimplemented: finish_incoming_stream")
fn finish_incoming_stream(&mut self, s: IncomingStream) -> wasmtime::Result<Option<Trailers>> {
for (_, value) in self.responses.iter() {
if value.body == s {
return match value.trailers {
0 => Ok(None),
_ => Ok(Some(value.trailers)),
};
}
}
bail!("unknown stream!")
}
fn finish_outgoing_stream(
&mut self,
Expand Down Expand Up @@ -181,8 +189,10 @@ impl crate::types::Host for WasiHttp {
.requests
.get_mut(&request)
.ok_or_else(|| anyhow!("unknown request: {request}"))?;
req.body = self.streams_id_base;
self.streams_id_base = self.streams_id_base + 1;
if req.body == 0 {
req.body = self.streams_id_base;
self.streams_id_base = self.streams_id_base + 1;
}
Ok(Ok(req.body))
}
fn drop_response_outparam(&mut self, _response: ResponseOutparam) -> wasmtime::Result<()> {
Expand Down