-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Make streams owned by request/response that they are tied to. #6228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
c7b3f25
a72d8e0
14b25d2
22a2dd5
bf2c3ba
b855a4b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,4 @@ | ||
| use crate::poll::Pollable; | ||
| use crate::r#struct::Stream; | ||
| use crate::streams::{InputStream, OutputStream, StreamError}; | ||
| use crate::WasiHttp; | ||
| use anyhow::{anyhow, bail}; | ||
|
|
@@ -74,34 +73,23 @@ impl crate::streams::Host for WasiHttp { | |
| this: OutputStream, | ||
| buf: Vec<u8>, | ||
| ) -> wasmtime::Result<Result<u64, StreamError>> { | ||
| let len = buf.len(); | ||
| match self.streams.get(&this) { | ||
| Some(st) => { | ||
| if st.closed { | ||
| bail!("stream is dropped!"); | ||
| } | ||
| let data = &st.data; | ||
| let mut new = bytes::BytesMut::with_capacity(data.len() + buf.len()); | ||
| new.put(data.clone()); | ||
| new.put(bytes::Bytes::from(buf.clone())); | ||
| self.streams.insert( | ||
| this, | ||
| Stream { | ||
| closed: false, | ||
| data: new.freeze(), | ||
| }, | ||
| ); | ||
| let new_len = st.data.len() + len; | ||
| let mut new = bytes::BytesMut::with_capacity(new_len); | ||
|
||
| new.put(st.data.clone()); | ||
| new.put(bytes::Bytes::from(buf)); | ||
| self.streams.insert(this, new.freeze().into()); | ||
| } | ||
| None => { | ||
| self.streams.insert( | ||
| this, | ||
| Stream { | ||
| closed: false, | ||
| data: bytes::Bytes::from(buf.clone()), | ||
| }, | ||
| ); | ||
| self.streams.insert(this, bytes::Bytes::from(buf).into()); | ||
| } | ||
| } | ||
| Ok(Ok(buf.len().try_into()?)) | ||
| Ok(Ok(len.try_into()?)) | ||
| } | ||
|
|
||
| fn write_zeroes( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,7 +2,7 @@ use crate::types::{Method, Scheme}; | |
| use bytes::Bytes; | ||
| use std::collections::HashMap; | ||
|
|
||
| #[derive(Clone)] | ||
| #[derive(Clone, Default)] | ||
| pub struct Stream { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking at the usages of this struct, it seems that a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added. |
||
| pub closed: bool, | ||
| pub data: Bytes, | ||
|
|
@@ -74,9 +74,15 @@ impl ActiveResponse { | |
|
|
||
| impl Stream { | ||
| pub fn new() -> Self { | ||
| Self::default() | ||
| } | ||
| } | ||
|
|
||
| impl From<Bytes> for Stream { | ||
| fn from(bytes: Bytes) -> Self { | ||
| Self { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we had a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. |
||
| closed: false, | ||
| data: Bytes::new(), | ||
| data: bytes, | ||
| } | ||
| } | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.