Skip to content

Commit c1f982e

Browse files
committed
wasi-http: Allow embedder to manage outgoing connections
This adds a new `send_request` method to `WasiHttpView`, allowing embedders to override the default implementation with their own if the desire. The default implementation behaves exactly as before. I've also added a few new `wasi-http` tests: one to test the above, and two others to test streaming and concurrency. These tests are ports of the `test_wasi_http_echo` and `test_wasi_http_hash_all` tests in the [Spin](https://github.com/fermyon/spin) integration test suite. The component they instantiate is likewise ported from the Spin `wasi-http-rust-streaming-outgoing-body` component. Fixes #7259 Signed-off-by: Joel Dice <joel.dice@fermyon.com>
1 parent 5c7ed43 commit c1f982e

File tree

7 files changed

+798
-139
lines changed

7 files changed

+798
-139
lines changed

Cargo.lock

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/test-programs/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,7 @@ wasi = "0.11.0"
1212
wit-bindgen = { workspace = true, features = ['default'] }
1313
libc = { workspace = true }
1414
getrandom = "0.2.9"
15+
futures = { workspace = true, default-features = false, features = ['alloc'] }
16+
url = { workspace = true }
17+
sha2 = "0.10.2"
18+
base64 = "0.21.0"
Lines changed: 362 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,362 @@
1+
use anyhow::{bail, Result};
2+
use bindings::wasi::http::types::{
3+
Fields, IncomingRequest, Method, OutgoingBody, OutgoingRequest, OutgoingResponse,
4+
ResponseOutparam, Scheme,
5+
};
6+
use futures::{stream, SinkExt, StreamExt, TryStreamExt};
7+
use url::Url;
8+
9+
mod bindings {
10+
use super::Handler;
11+
12+
wit_bindgen::generate!({
13+
path: "../wasi-http/wit",
14+
world: "wasi:http/proxy",
15+
exports: {
16+
"wasi:http/incoming-handler": Handler,
17+
},
18+
});
19+
}
20+
21+
const MAX_CONCURRENCY: usize = 16;
22+
23+
struct Handler;
24+
25+
impl bindings::exports::wasi::http::incoming_handler::Guest for Handler {
26+
fn handle(request: IncomingRequest, response_out: ResponseOutparam) {
27+
executor::run(async move {
28+
handle_request(request, response_out).await;
29+
})
30+
}
31+
}
32+
33+
async fn handle_request(request: IncomingRequest, response_out: ResponseOutparam) {
34+
let headers = request.headers().entries();
35+
36+
match (request.method(), request.path_with_query().as_deref()) {
37+
(Method::Get, Some("/hash-all")) => {
38+
let urls = headers.iter().filter_map(|(k, v)| {
39+
(k == "url")
40+
.then_some(v)
41+
.and_then(|v| std::str::from_utf8(v).ok())
42+
.and_then(|v| Url::parse(v).ok())
43+
});
44+
45+
let results = urls.map(|url| async move {
46+
let result = hash(&url).await;
47+
(url, result)
48+
});
49+
50+
let mut results = stream::iter(results).buffer_unordered(MAX_CONCURRENCY);
51+
52+
let response = OutgoingResponse::new(
53+
200,
54+
&Fields::new(&[("content-type".to_string(), b"text/plain".to_vec())]),
55+
);
56+
57+
let mut body =
58+
executor::outgoing_body(response.write().expect("response should be writable"));
59+
60+
ResponseOutparam::set(response_out, Ok(response));
61+
62+
while let Some((url, result)) = results.next().await {
63+
let payload = match result {
64+
Ok(hash) => format!("{url}: {hash}\n"),
65+
Err(e) => format!("{url}: {e:?}\n"),
66+
}
67+
.into_bytes();
68+
69+
if let Err(e) = body.send(payload).await {
70+
eprintln!("Error sending payload: {e}");
71+
}
72+
}
73+
}
74+
75+
(Method::Post, Some("/echo")) => {
76+
let response = OutgoingResponse::new(
77+
200,
78+
&Fields::new(
79+
&headers
80+
.into_iter()
81+
.filter_map(|(k, v)| (k == "content-type").then_some((k, v)))
82+
.collect::<Vec<_>>(),
83+
),
84+
);
85+
86+
let mut body =
87+
executor::outgoing_body(response.write().expect("response should be writable"));
88+
89+
ResponseOutparam::set(response_out, Ok(response));
90+
91+
let mut stream =
92+
executor::incoming_body(request.consume().expect("request should be readable"));
93+
94+
while let Some(chunk) = stream.next().await {
95+
match chunk {
96+
Ok(chunk) => {
97+
if let Err(e) = body.send(chunk).await {
98+
eprintln!("Error sending body: {e}");
99+
break;
100+
}
101+
}
102+
Err(e) => {
103+
eprintln!("Error receiving body: {e}");
104+
break;
105+
}
106+
}
107+
}
108+
}
109+
110+
_ => {
111+
let response = OutgoingResponse::new(405, &Fields::new(&[]));
112+
113+
let body = response.write().expect("response should be writable");
114+
115+
ResponseOutparam::set(response_out, Ok(response));
116+
117+
OutgoingBody::finish(body, None);
118+
}
119+
}
120+
}
121+
122+
async fn hash(url: &Url) -> Result<String> {
123+
let request = OutgoingRequest::new(
124+
&Method::Get,
125+
Some(url.path()),
126+
Some(&match url.scheme() {
127+
"http" => Scheme::Http,
128+
"https" => Scheme::Https,
129+
scheme => Scheme::Other(scheme.into()),
130+
}),
131+
Some(url.authority()),
132+
&Fields::new(&[]),
133+
);
134+
135+
let response = executor::outgoing_request_send(request).await?;
136+
137+
let status = response.status();
138+
139+
if !(200..300).contains(&status) {
140+
bail!("unexpected status: {status}");
141+
}
142+
143+
let mut body =
144+
executor::incoming_body(response.consume().expect("response should be readable"));
145+
146+
use sha2::Digest;
147+
let mut hasher = sha2::Sha256::new();
148+
while let Some(chunk) = body.try_next().await? {
149+
hasher.update(&chunk);
150+
}
151+
152+
use base64::Engine;
153+
Ok(base64::engine::general_purpose::STANDARD_NO_PAD.encode(hasher.finalize()))
154+
}
155+
156+
// Technically this should not be here for a proxy, but given the current
157+
// framework for tests it's required since this file is built as a `bin`
158+
fn main() {}
159+
160+
mod executor {
161+
use super::bindings::wasi::{
162+
http::{
163+
outgoing_handler,
164+
types::{
165+
self, IncomingBody, IncomingResponse, InputStream, OutgoingBody, OutgoingRequest,
166+
OutputStream,
167+
},
168+
},
169+
io::{self, streams::StreamError},
170+
};
171+
use anyhow::{anyhow, Error, Result};
172+
use futures::{future, sink, stream, Sink, Stream};
173+
use std::{
174+
cell::RefCell,
175+
future::Future,
176+
mem,
177+
rc::Rc,
178+
sync::{Arc, Mutex},
179+
task::{Context, Poll, Wake, Waker},
180+
};
181+
182+
const READ_SIZE: u64 = 16 * 1024;
183+
184+
static WAKERS: Mutex<Vec<(io::poll::Pollable, Waker)>> = Mutex::new(Vec::new());
185+
186+
pub fn run<T>(future: impl Future<Output = T>) -> T {
187+
futures::pin_mut!(future);
188+
189+
struct DummyWaker;
190+
191+
impl Wake for DummyWaker {
192+
fn wake(self: Arc<Self>) {}
193+
}
194+
195+
let waker = Arc::new(DummyWaker).into();
196+
197+
loop {
198+
match future.as_mut().poll(&mut Context::from_waker(&waker)) {
199+
Poll::Pending => {
200+
let mut new_wakers = Vec::new();
201+
202+
let wakers = mem::take::<Vec<_>>(&mut WAKERS.lock().unwrap());
203+
204+
assert!(!wakers.is_empty());
205+
206+
let pollables = wakers
207+
.iter()
208+
.map(|(pollable, _)| pollable)
209+
.collect::<Vec<_>>();
210+
211+
let mut ready = vec![false; wakers.len()];
212+
213+
for index in io::poll::poll_list(&pollables) {
214+
ready[usize::try_from(index).unwrap()] = true;
215+
}
216+
217+
for (ready, (pollable, waker)) in ready.into_iter().zip(wakers) {
218+
if ready {
219+
waker.wake()
220+
} else {
221+
new_wakers.push((pollable, waker));
222+
}
223+
}
224+
225+
*WAKERS.lock().unwrap() = new_wakers;
226+
}
227+
Poll::Ready(result) => break result,
228+
}
229+
}
230+
}
231+
232+
pub fn outgoing_body(body: OutgoingBody) -> impl Sink<Vec<u8>, Error = Error> {
233+
struct Outgoing(Option<(OutputStream, OutgoingBody)>);
234+
235+
impl Drop for Outgoing {
236+
fn drop(&mut self) {
237+
if let Some((stream, body)) = self.0.take() {
238+
drop(stream);
239+
OutgoingBody::finish(body, None);
240+
}
241+
}
242+
}
243+
244+
let stream = body.write().expect("response body should be writable");
245+
let pair = Rc::new(RefCell::new(Outgoing(Some((stream, body)))));
246+
247+
sink::unfold((), {
248+
move |(), chunk: Vec<u8>| {
249+
future::poll_fn({
250+
let mut offset = 0;
251+
let mut flushing = false;
252+
let pair = pair.clone();
253+
254+
move |context| {
255+
let pair = pair.borrow();
256+
let (stream, _) = &pair.0.as_ref().unwrap();
257+
258+
loop {
259+
match stream.check_write() {
260+
Ok(0) => {
261+
WAKERS
262+
.lock()
263+
.unwrap()
264+
.push((stream.subscribe(), context.waker().clone()));
265+
266+
break Poll::Pending;
267+
}
268+
Ok(count) => {
269+
if offset == chunk.len() {
270+
if flushing {
271+
break Poll::Ready(Ok(()));
272+
} else {
273+
stream.flush().expect("stream should be flushable");
274+
flushing = true;
275+
}
276+
} else {
277+
let count = usize::try_from(count)
278+
.unwrap()
279+
.min(chunk.len() - offset);
280+
281+
match stream.write(&chunk[offset..][..count]) {
282+
Ok(()) => {
283+
offset += count;
284+
}
285+
Err(_) => break Poll::Ready(Err(anyhow!("I/O error"))),
286+
}
287+
}
288+
}
289+
Err(_) => break Poll::Ready(Err(anyhow!("I/O error"))),
290+
}
291+
}
292+
}
293+
})
294+
}
295+
})
296+
}
297+
298+
pub fn outgoing_request_send(
299+
request: OutgoingRequest,
300+
) -> impl Future<Output = Result<IncomingResponse, types::Error>> {
301+
future::poll_fn({
302+
let response = outgoing_handler::handle(request, None);
303+
304+
move |context| match &response {
305+
Ok(response) => {
306+
if let Some(response) = response.get() {
307+
Poll::Ready(response.unwrap())
308+
} else {
309+
WAKERS
310+
.lock()
311+
.unwrap()
312+
.push((response.subscribe(), context.waker().clone()));
313+
Poll::Pending
314+
}
315+
}
316+
Err(error) => Poll::Ready(Err(error.clone())),
317+
}
318+
})
319+
}
320+
321+
pub fn incoming_body(body: IncomingBody) -> impl Stream<Item = Result<Vec<u8>>> {
322+
struct Incoming(Option<(InputStream, IncomingBody)>);
323+
324+
impl Drop for Incoming {
325+
fn drop(&mut self) {
326+
if let Some((stream, body)) = self.0.take() {
327+
drop(stream);
328+
IncomingBody::finish(body);
329+
}
330+
}
331+
}
332+
333+
stream::poll_fn({
334+
let stream = body.stream().expect("response body should be readable");
335+
let pair = Incoming(Some((stream, body)));
336+
337+
move |context| {
338+
if let Some((stream, _)) = &pair.0 {
339+
match stream.read(READ_SIZE) {
340+
Ok(buffer) => {
341+
if buffer.is_empty() {
342+
WAKERS
343+
.lock()
344+
.unwrap()
345+
.push((stream.subscribe(), context.waker().clone()));
346+
Poll::Pending
347+
} else {
348+
Poll::Ready(Some(Ok(buffer)))
349+
}
350+
}
351+
Err(StreamError::Closed) => Poll::Ready(None),
352+
Err(StreamError::LastOperationFailed(error)) => {
353+
Poll::Ready(Some(Err(anyhow!("{}", error.to_debug_string()))))
354+
}
355+
}
356+
} else {
357+
Poll::Ready(None)
358+
}
359+
}
360+
})
361+
}
362+
}

0 commit comments

Comments
 (0)