Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
8539425
Stabilize unsigned_abs
jhpratt Jan 13, 2021
edf2e37
Use unsigned_abs throughout repository
jhpratt Jan 13, 2021
0c8db16
Add `core::stream::Stream`
yoshuawuyts Nov 13, 2020
e94cf57
Make functional record update/struct update syntax works inside closu…
null-sleep Jan 23, 2021
a1b1132
Remove `Stream::next`
yoshuawuyts Jan 15, 2021
d00c850
BTreeMap: correct tests for alternative choices of B
ssomers Jan 20, 2021
328abfb
Slight simplification of chars().count()
gilescope Jan 26, 2021
425a70a
Removing if so it's more like the previous implementation.
gilescope Jan 26, 2021
c07e558
Let's try the most idiomatic way.
gilescope Jan 26, 2021
a623ea5
Same instructions, but simpler.
gilescope Jan 26, 2021
063b427
Bump rustfmt version
Mark-Simulacrum Jan 9, 2021
5d73918
Clone entire `TokenCursor` when collecting tokens
Aaron1011 Jan 28, 2021
ad14924
path trimming: ignore type aliases
da-x Jan 28, 2021
9dc0d1d
path trimming: disable on src/test/run-make-fulldeps/coverage-spanview
da-x Jan 28, 2021
a6fa92c
Balance sidebar `Deref` cycle check with main content
jryans Jan 28, 2021
f620b5c
rustdoc: Remove unnecessary optional
camelid Jan 29, 2021
02094f9
Updated Vec::splice documentation
SOF3 Jan 29, 2021
5e983d7
Add a test for syntax like: ..t.s
null-sleep Jan 29, 2021
63714af
update rustfmt to v1.4.34
calebcartwright Jan 29, 2021
d8b5745
Treat nightlies for a version as complete
est31 Jan 28, 2021
dd18c48
Add tests
est31 Jan 28, 2021
08141a5
Add missiong variants in match binding
GuillaumeGomez Jan 29, 2021
13ffa43
rename raw_const/mut -> const/mut_addr_of, and stabilize them
RalfJung Jan 10, 2021
718398c
Fix typo in pat.rs
eltociear Jan 29, 2021
2987f03
Rollup merge of #78052 - da-x:path-trimming-type-aliases, r=davidtwco
jonas-schievink Jan 29, 2021
e711d79
Rollup merge of #79023 - yoshuawuyts:stream, r=KodrAus
jonas-schievink Jan 29, 2021
f6932d7
Rollup merge of #80843 - Mark-Simulacrum:fmt-bump, r=petrochenkov
jonas-schievink Jan 29, 2021
0648f5d
Rollup merge of #80886 - RalfJung:stable-raw-ref-macros, r=m-ou-se
jonas-schievink Jan 29, 2021
e222fde
Rollup merge of #80959 - jhpratt:unsigned_abs-stabilization, r=m-ou-se
jonas-schievink Jan 29, 2021
2b23c7f
Rollup merge of #81210 - ssomers:btree_fix_node_size_test, r=Mark-Sim…
jonas-schievink Jan 29, 2021
f7e2e37
Rollup merge of #81291 - sexxi-goose:fix-struct-update-functional-rec…
jonas-schievink Jan 29, 2021
bd44c36
Rollup merge of #81409 - gilescope:chars_count, r=joshtriplett
jonas-schievink Jan 29, 2021
444f2af
Rollup merge of #81468 - est31:cfg_version, r=petrochenkov
jonas-schievink Jan 29, 2021
b6b76dd
Rollup merge of #81472 - Aaron1011:fix/revert-cursor-clone, r=petroch…
jonas-schievink Jan 29, 2021
136e37d
Rollup merge of #81491 - jryans:rustdoc-deref-ice-81395, r=GuillaumeG…
jonas-schievink Jan 29, 2021
6b35385
Rollup merge of #81495 - camelid:rustdoc-output_format-optional, r=Gu…
jonas-schievink Jan 29, 2021
01898a2
Rollup merge of #81499 - SOF3:patch-1, r=sanxiyn
jonas-schievink Jan 29, 2021
1244551
Rollup merge of #81501 - calebcartwright:update-rustfmt, r=sanxiyn
jonas-schievink Jan 29, 2021
e56472c
Rollup merge of #81512 - GuillaumeGomez:cleanup-fixme-rustdoc, r=buga…
jonas-schievink Jan 29, 2021
bb999f7
Rollup merge of #81515 - eltociear:patch-7, r=jonas-schievink
jonas-schievink Jan 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions library/alloc/src/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ use core::ops::{
};
use core::pin::Pin;
use core::ptr::{self, Unique};
use core::stream::Stream;
use core::task::{Context, Poll};

use crate::alloc::{handle_alloc_error, AllocError, Allocator, Global, Layout, WriteCloneIntoRaw};
Expand Down Expand Up @@ -1621,3 +1622,16 @@ where
F::poll(Pin::new(&mut *self), cx)
}
}

#[unstable(feature = "async_stream", issue = "79024")]
impl<S: ?Sized + Stream + Unpin> Stream for Box<S> {
type Item = S::Item;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut **self).poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
(**self).size_hint()
}
}
1 change: 1 addition & 0 deletions library/alloc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
#![feature(array_windows)]
#![feature(allow_internal_unstable)]
#![feature(arbitrary_self_types)]
#![feature(async_stream)]
#![feature(box_patterns)]
#![feature(box_syntax)]
#![feature(cfg_sanitize)]
Expand Down
2 changes: 2 additions & 0 deletions library/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ pub mod panicking;
pub mod pin;
pub mod raw;
pub mod result;
#[unstable(feature = "async_stream", issue = "79024")]
pub mod stream;
pub mod sync;

pub mod fmt;
Expand Down
127 changes: 127 additions & 0 deletions library/core/src/stream/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
//! Composable asynchronous iteration.
//!
//! If futures are asynchronous values, then streams are asynchronous
//! iterators. If you've found yourself with an asynchronous collection of some kind,
//! and needed to perform an operation on the elements of said collection,
//! you'll quickly run into 'streams'. Streams are heavily used in idiomatic
//! asynchronous Rust code, so it's worth becoming familiar with them.
//!
//! Before explaining more, let's talk about how this module is structured:
//!
//! # Organization
//!
//! This module is largely organized by type:
//!
//! * [Traits] are the core portion: these traits define what kind of streams
//! exist and what you can do with them. The methods of these traits are worth
//! putting some extra study time into.
//! * Functions provide some helpful ways to create some basic streams.
//! * Structs are often the return types of the various methods on this
//! module's traits. You'll usually want to look at the method that creates
//! the `struct`, rather than the `struct` itself. For more detail about why,
//! see '[Implementing Stream](#implementing-stream)'.
//!
//! [Traits]: #traits
//!
//! That's it! Let's dig into streams.
//!
//! # Stream
//!
//! The heart and soul of this module is the [`Stream`] trait. The core of
//! [`Stream`] looks like this:
//!
//! ```
//! # use core::task::{Context, Poll};
//! # use core::pin::Pin;
//! trait Stream {
//! type Item;
//! fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
//! }
//! ```
//!
//! Unlike `Iterator`, `Stream` makes a distinction between the [`poll_next`]
//! method which is used when implementing a `Stream`, and a (to-be-implemented)
//! `next` method which is used when consuming a stream. Consumers of `Stream`
//! only need to consider `next`, which when called, returns a future which
//! yields `Option<Stream::Item>`.
//!
//! The future returned by `next` will yield `Some(Item)` as long as there are
//! elements, and once they've all been exhausted, will yield `None` to indicate
//! that iteration is finished. If we're waiting on something asynchronous to
//! resolve, the future will wait until the stream is ready to yield again.
//!
//! Individual streams may choose to resume iteration, and so calling `next`
//! again may or may not eventually yield `Some(Item)` again at some point.
//!
//! [`Stream`]'s full definition includes a number of other methods as well,
//! but they are default methods, built on top of [`poll_next`], and so you get
//! them for free.
//!
//! [`Poll`]: super::task::Poll
//! [`poll_next`]: Stream::poll_next
//!
//! # Implementing Stream
//!
//! Creating a stream of your own involves two steps: creating a `struct` to
//! hold the stream's state, and then implementing [`Stream`] for that
//! `struct`.
//!
//! Let's make a stream named `Counter` which counts from `1` to `5`:
//!
//! ```no_run
//! #![feature(async_stream)]
//! # use core::stream::Stream;
//! # use core::task::{Context, Poll};
//! # use core::pin::Pin;
//!
//! // First, the struct:
//!
//! /// A stream which counts from one to five
//! struct Counter {
//! count: usize,
//! }
//!
//! // we want our count to start at one, so let's add a new() method to help.
//! // This isn't strictly necessary, but is convenient. Note that we start
//! // `count` at zero, we'll see why in `poll_next()`'s implementation below.
//! impl Counter {
//! fn new() -> Counter {
//! Counter { count: 0 }
//! }
//! }
//!
//! // Then, we implement `Stream` for our `Counter`:
//!
//! impl Stream for Counter {
//! // we will be counting with usize
//! type Item = usize;
//!
//! // poll_next() is the only required method
//! fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
//! // Increment our count. This is why we started at zero.
//! self.count += 1;
//!
//! // Check to see if we've finished counting or not.
//! if self.count < 6 {
//! Poll::Ready(Some(self.count))
//! } else {
//! Poll::Ready(None)
//! }
//! }
//! }
//! ```
//!
//! # Laziness
//!
//! Streams are *lazy*. This means that just creating a stream doesn't _do_ a
//! whole lot. Nothing really happens until you call `next`. This is sometimes a
//! source of confusion when creating a stream solely for its side effects. The
//! compiler will warn us about this kind of behavior:
//!
//! ```text
//! warning: unused result that must be used: streams do nothing unless polled
//! ```

mod stream;

pub use stream::Stream;
110 changes: 110 additions & 0 deletions library/core/src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use crate::ops::DerefMut;
use crate::pin::Pin;
use crate::task::{Context, Poll};

/// An interface for dealing with asynchronous iterators.
///
/// This is the main stream trait. For more about the concept of streams
/// generally, please see the [module-level documentation]. In particular, you
/// may want to know how to [implement `Stream`][impl].
///
/// [module-level documentation]: index.html
/// [impl]: index.html#implementing-stream
#[unstable(feature = "async_stream", issue = "79024")]
#[must_use = "streams do nothing unless polled"]
pub trait Stream {
/// The type of items yielded by the stream.
type Item;

/// Attempt to pull out the next value of this stream, registering the
/// current task for wakeup if the value is not yet available, and returning
/// `None` if the stream is exhausted.
///
/// # Return value
///
/// There are several possible return values, each indicating a distinct
/// stream state:
///
/// - `Poll::Pending` means that this stream's next value is not ready
/// yet. Implementations will ensure that the current task will be notified
/// when the next value may be ready.
///
/// - `Poll::Ready(Some(val))` means that the stream has successfully
/// produced a value, `val`, and may produce further values on subsequent
/// `poll_next` calls.
///
/// - `Poll::Ready(None)` means that the stream has terminated, and
/// `poll_next` should not be invoked again.
///
/// # Panics
///
/// Once a stream has finished (returned `Ready(None)` from `poll_next`), calling its
/// `poll_next` method again may panic, block forever, or cause other kinds of
/// problems; the `Stream` trait places no requirements on the effects of
/// such a call. However, as the `poll_next` method is not marked `unsafe`,
/// Rust's usual rules apply: calls must never cause undefined behavior
/// (memory corruption, incorrect use of `unsafe` functions, or the like),
/// regardless of the stream's state.
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;

/// Returns the bounds on the remaining length of the stream.
///
/// Specifically, `size_hint()` returns a tuple where the first element
/// is the lower bound, and the second element is the upper bound.
///
/// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`.
/// A [`None`] here means that either there is no known upper bound, or the
/// upper bound is larger than [`usize`].
///
/// # Implementation notes
///
/// It is not enforced that a stream implementation yields the declared
/// number of elements. A buggy stream may yield less than the lower bound
/// or more than the upper bound of elements.
///
/// `size_hint()` is primarily intended to be used for optimizations such as
/// reserving space for the elements of the stream, but must not be
/// trusted to e.g., omit bounds checks in unsafe code. An incorrect
/// implementation of `size_hint()` should not lead to memory safety
/// violations.
///
/// That said, the implementation should provide a correct estimation,
/// because otherwise it would be a violation of the trait's protocol.
///
/// The default implementation returns `(0, `[`None`]`)` which is correct for any
/// stream.
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}

#[unstable(feature = "async_stream", issue = "79024")]
impl<S: ?Sized + Stream + Unpin> Stream for &mut S {
type Item = S::Item;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
S::poll_next(Pin::new(&mut **self), cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
(**self).size_hint()
}
}

#[unstable(feature = "async_stream", issue = "79024")]
impl<P> Stream for Pin<P>
where
P: DerefMut + Unpin,
P::Target: Stream,
{
type Item = <P::Target as Stream>::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.get_mut().as_mut().poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
(**self).size_hint()
}
}
3 changes: 3 additions & 0 deletions library/std/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@
#![feature(allocator_internals)]
#![feature(allow_internal_unsafe)]
#![feature(allow_internal_unstable)]
#![feature(async_stream)]
#![feature(arbitrary_self_types)]
#![feature(array_error_internals)]
#![feature(asm)]
Expand Down Expand Up @@ -450,6 +451,8 @@ pub use core::ptr;
pub use core::raw;
#[stable(feature = "rust1", since = "1.0.0")]
pub use core::result;
#[unstable(feature = "async_stream", issue = "79024")]
pub use core::stream;
#[stable(feature = "i128", since = "1.26.0")]
#[allow(deprecated, deprecated_in_future)]
pub use core::u128;
Expand Down
14 changes: 14 additions & 0 deletions library/std/src/panic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::panicking;
use crate::pin::Pin;
use crate::ptr::{NonNull, Unique};
use crate::rc::Rc;
use crate::stream::Stream;
use crate::sync::atomic;
use crate::sync::{Arc, Mutex, RwLock};
use crate::task::{Context, Poll};
Expand Down Expand Up @@ -340,6 +341,19 @@ impl<F: Future> Future for AssertUnwindSafe<F> {
}
}

#[unstable(feature = "async_stream", issue = "79024")]
impl<S: Stream> Stream for AssertUnwindSafe<S> {
type Item = S::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
unsafe { self.map_unchecked_mut(|x| &mut x.0) }.poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
}
}

/// Invokes a closure, capturing the cause of an unwinding panic if one occurs.
///
/// This function will return `Ok` with the closure's result if the closure
Expand Down