diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index eebc02735b..0000000000 --- a/.travis.yml +++ /dev/null @@ -1,57 +0,0 @@ -language: rust -sudo: false - -matrix: - include: - - os: osx - - rust: stable - - rust: beta - - rust: nightly - script: cargo bench --all && cd futures-util && cargo bench --features=bench - - rust: nightly - before_script: - - pip install ghp-import --user && export PATH=$HOME/.local/bin:$PATH - script: - - cargo doc - after_success: - - ghp-import -n target/doc - - git push -qf https://${GH_TOKEN}@github.com/${TRAVIS_REPO_SLUG}.git gh-pages - - rust: stable - script: - - cargo build --manifest-path futures-core/Cargo.toml --no-default-features - - cargo build --manifest-path futures/Cargo.toml --no-default-features - - cargo build --manifest-path futures-channel/Cargo.toml --no-default-features - - cargo build --manifest-path futures-executor/Cargo.toml --no-default-features - - cargo build --manifest-path futures-sink/Cargo.toml --no-default-features - - cargo build --manifest-path futures-util/Cargo.toml --no-default-features - - rust: nightly - script: - - cargo build --manifest-path futures-core/Cargo.toml --features nightly - - cargo build --manifest-path futures-stable/Cargo.toml --features nightly - - cargo build --manifest-path futures-async-runtime/Cargo.toml --features nightly - - cargo build --manifest-path futures-macro-async/Cargo.toml --features nightly - - cargo build --manifest-path futures/Cargo.toml --features nightly - - cargo test --manifest-path futures-macro-await/Cargo.toml --features nightly - - cargo test --manifest-path futures/Cargo.toml --features nightly --test async_await_tests - - rust: nightly - script: - - rustup component add rust-src - - cargo install xargo - - xargo build --manifest-path futures/Cargo.toml --target thumbv6m-none-eabi --no-default-features --features nightly - - rust: 1.20.0 - script: cargo test --all - - rust: nightly - script: - - cargo test --manifest-path futures/testcrate/Cargo.toml - -script: - - cargo test --all - - cargo test --all --release - -env: - global: - - secure: "iwVcMVIF7ZSY82fK5UyyUvVvJxMSYrbZawh1+4Oi8pvOdYq1gptcDoOC8jxWwCwrNF1b+/85n+jlEUngEqqSmV5PjAbWPjoc+u4Zn7CRi1AlxoUlvHPiQm4vM4Mkkd6GsqoIZttCeedU9m/w0nQ18uUtK8uD6vr2FVdcMnUnkYQAxuGOowGLrwidukzfBXMCu/JrwKMIbt61knAFiI/KJknu0h1mRrhpeF/sQ3tJFzRRcQeFJkbfwDzltMpPo1hq5D3HI4ONjYi/qO2pwUhDk4umfp9cLW9MS8rQvptxJTQmWemHi+f2/U4ld6a0URL6kEuMkt/EbH0A74eFtlicfRs44dX9MlWoqbLypnC3ymqmHcpwcwNA3HmZyg800MTuU+BPK41HIPdO9tPpxjHEiqvNDknH7qs+YBnis0eH7DHJgEjXq651PjW7pm+rnHPwsj+OzKE1YBNxBQZZDkS3VnZJz+O4tVsOzc3IOz0e+lf7VVuI17C9haj117nKp3umC4MVBA0S8RfreFgqpyDeY2zwcqOr0YOlEGGRl0vyWP8Qcxx12kQ7+doLolt6Kxda4uO0hKRmIF6+qki1T+L7v8BOGOtCncz4f7IX48eQ7+Wu0OtglRn45qAa3CxjUuW6xX3KSNH66PCXV0Jtp8Ga2SSevX2wtbbFu9f+9R+PQY4=" - -notifications: - email: - on_success: never diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d0c673176..cc0427bb3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,23 @@ +**Note**: This CHANGELOG is no longer maintained for newer 0.1.x and +other releases. See instead the github release tags and individual +git commits. + +----- + +# 0.2.1 - 2018-4-19 (*yanked*) + +* Add the `futures-stable` crate for working with immovable futures. +* Add async/await support behind the `nightly` feature gate. +* Add `Stream::for_each_concurrent`. + +# 0.2.0 - 2018-4-6 (*yanked*) + +* Separate out the futures crate into a number of separate crates which + are reexported by a single facade. +* Add a `&mut task::Context` parameter to `poll` methods. This type includes + access to a wakeup callback, task-local-data, and a default executor. +* Refactor the `Sink` trait. + # 0.1.17 - 2017-10-31 * Add a `close` method on `sink::Wait` diff --git a/Cargo.toml b/Cargo.toml index d2499dc116..6a5bafdae6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,14 +1,14 @@ [workspace] members = [ "futures", - "futures-async-runtime", - "futures-core", - "futures-channel", - "futures-executor", - "futures-io", - "futures-macro-async", - "futures-macro-await", - "futures-sink", - "futures-stable", - "futures-util", +# "futures-async-runtime", +# "futures-core", +# "futures-channel", +# "futures-executor", +# "futures-io", +# "futures-macro-async", +# "futures-macro-await", +# "futures-sink", +# "futures-stable", +# "futures-util", ] diff --git a/README.md b/README.md index f846a32221..58a19129cd 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ +**Announcement: Current development of the futures crates is happening on the 0.3 branch** + # futures-rs This library is an implementation of **zero-cost futures** in Rust. @@ -15,7 +17,7 @@ First, add this to your `Cargo.toml`: ```toml [dependencies] -futures = "0.2.0" +futures = "0.1.22" ``` Next, add this to your crate: @@ -38,7 +40,7 @@ a `#[no_std]` environment, use: ```toml [dependencies] -futures = { version = "0.2.0", default-features = false } +futures = { version = "0.1.22", default-features = false } ``` # License diff --git a/appveyor.yml b/appveyor.yml deleted file mode 100644 index ab0aac6cef..0000000000 --- a/appveyor.yml +++ /dev/null @@ -1,35 +0,0 @@ -environment: - - # At the time this was added AppVeyor was having troubles with checking - # revocation of SSL certificates of sites like static.rust-lang.org and what - # we think is crates.io. The libcurl HTTP client by default checks for - # revocation on Windows and according to a mailing list [1] this can be - # disabled. - # - # The `CARGO_HTTP_CHECK_REVOKE` env var here tells cargo to disable SSL - # revocation checking on Windows in libcurl. Note, though, that rustup, which - # we're using to download Rust here, also uses libcurl as the default backend. - # Unlike Cargo, however, rustup doesn't have a mechanism to disable revocation - # checking. To get rustup working we set `RUSTUP_USE_HYPER` which forces it to - # use the Hyper instead of libcurl backend. Both Hyper and libcurl use - # schannel on Windows but it appears that Hyper configures it slightly - # differently such that revocation checking isn't turned on by default. - # - # [1]: https://curl.haxx.se/mail/lib-2016-03/0202.html - RUSTUP_USE_HYPER: 1 - CARGO_HTTP_CHECK_REVOKE: false - - matrix: - - TARGET: x86_64-pc-windows-msvc -install: - - set PATH=C:\Program Files\Git\mingw64\bin;%PATH% - - curl -sSf -o rustup-init.exe https://win.rustup.rs/ - - rustup-init.exe -y --default-host %TARGET% - - set PATH=%PATH%;C:\Users\appveyor\.cargo\bin - - rustc -V - - cargo -V - -build: false - -test_script: - - cargo test --all diff --git a/futures-async-runtime/Cargo.toml b/futures-async-runtime/Cargo.toml index 6660c1a8c8..679cf0719c 100644 --- a/futures-async-runtime/Cargo.toml +++ b/futures-async-runtime/Cargo.toml @@ -1,9 +1,9 @@ [package] -name = "futures-async-runtime" -version = "0.2.0" +name = "futures-async-runtime-preview" +version = "0.2.3" authors = ["Alex Crichton "] license = "MIT/Apache-2.0" -readme = "README.md" +readme = "../README.md" keywords = ["async"] categories = ["asynchronous", "rust-patterns"] repository = "https://github.com/rust-lang-nursery/futures-rs" @@ -13,17 +13,20 @@ description = """ Runtime for the async/await macros in the `futures` crate. """ -[dependencies.futures-core] -version = "0.2.0" +[lib] +name = "futures_async_runtime" + +[dependencies.futures-core-preview] +version = "0.2.2" path = "../futures-core" default-features = false -[dependencies.futures-stable] -version = "0.2.0" +[dependencies.futures-stable-preview] +version = "0.2.2" path = "../futures-stable" default-features = false [features] -nightly = ["futures-core/nightly", "futures-stable/nightly"] -std = ["futures-core/std", "futures-stable/std"] +nightly = ["futures-core-preview/nightly", "futures-stable-preview/nightly"] +std = ["futures-core-preview/std", "futures-stable-preview/std"] default = ["std"] diff --git a/futures-async-runtime/LICENSE-APACHE b/futures-async-runtime/LICENSE-APACHE new file mode 120000 index 0000000000..965b606f33 --- /dev/null +++ b/futures-async-runtime/LICENSE-APACHE @@ -0,0 +1 @@ +../LICENSE-APACHE \ No newline at end of file diff --git a/futures-async-runtime/LICENSE-MIT b/futures-async-runtime/LICENSE-MIT new file mode 120000 index 0000000000..76219eb72e --- /dev/null +++ b/futures-async-runtime/LICENSE-MIT @@ -0,0 +1 @@ +../LICENSE-MIT \ No newline at end of file diff --git a/futures-async-runtime/src/future.rs b/futures-async-runtime/src/future.rs index 712af91721..99fc46de2a 100644 --- a/futures-async-runtime/src/future.rs +++ b/futures-async-runtime/src/future.rs @@ -1,5 +1,5 @@ use std::marker::Unpin; -use std::mem::Pin; +use std::mem::PinMut; use std::ops::{Generator, GeneratorState}; use super::{IsResult, Reset, CTX}; @@ -25,12 +25,12 @@ impl StableFuture for GenStableFuture type Item = ::Ok; type Error = ::Err; - fn poll(mut self: Pin, ctx: &mut task::Context) -> Poll { + fn poll(self: PinMut, ctx: &mut task::Context) -> Poll { CTX.with(|cell| { let _r = Reset::new(ctx, cell); - let this: &mut Self = unsafe { Pin::get_mut(&mut self) }; + let this: &mut Self = unsafe { PinMut::get_mut_unchecked(self) }; // This is an immovable generator, but since we're only accessing - // it via a Pin this is safe. + // it via a PinMut this is safe. match unsafe { this.0.resume() } { GeneratorState::Yielded(Async::Pending) => Ok(Async::Pending), diff --git a/futures-async-runtime/src/stream.rs b/futures-async-runtime/src/stream.rs index ed82f481fa..62f5351f27 100644 --- a/futures-async-runtime/src/stream.rs +++ b/futures-async-runtime/src/stream.rs @@ -1,4 +1,4 @@ -use std::mem::Pin; +use std::mem::PinMut; use std::ops::{Generator, GeneratorState}; use std::marker::{PhantomData, Unpin}; @@ -37,13 +37,13 @@ impl StableStream for GenStableStream type Item = U; type Error = ::Err; - fn poll_next(mut self: Pin, ctx: &mut task::Context) -> Poll, Self::Error> { + fn poll_next(self: PinMut, ctx: &mut task::Context) -> Poll, Self::Error> { CTX.with(|cell| { let _r = Reset::new(ctx, cell); - let this: &mut Self = unsafe { Pin::get_mut(&mut self) }; + let this: &mut Self = unsafe { PinMut::get_mut_unchecked(self) }; if this.done { return Ok(Async::Ready(None)) } // This is an immovable generator, but since we're only accessing - // it via a Pin this is safe. + // it via a PinMut this is safe. match unsafe { this.gen.resume() } { GeneratorState::Yielded(Async::Ready(e)) => { Ok(Async::Ready(Some(e))) diff --git a/futures-channel/Cargo.toml b/futures-channel/Cargo.toml index 52deeb4bad..6eaa758377 100644 --- a/futures-channel/Cargo.toml +++ b/futures-channel/Cargo.toml @@ -1,6 +1,6 @@ [package] -name = "futures-channel" -version = "0.2.0" +name = "futures-channel-preview" +version = "0.2.2" authors = ["Alex Crichton "] license = "MIT/Apache-2.0" repository = "https://github.com/rust-lang-nursery/futures-rs" @@ -10,13 +10,16 @@ description = """ Channels for asynchronous communication using futures-rs. """ +[lib] +name = "futures_channel" + [features] -std = ["futures-core/std"] +std = ["futures-core-preview/std"] default = ["std"] [dependencies] -futures-core = { path = "../futures-core", version = "0.2.0", default-features = false } +futures-core-preview = { path = "../futures-core", version = "0.2.2", default-features = false } [dev-dependencies] -futures = { path = "../futures", version = "0.2.0", default-features = true } -futures-executor = { path = "../futures-executor", version = "0.2.0", default-features = true } +futures-preview = { path = "../futures", version = "0.2.2", default-features = true } +futures-executor-preview = { path = "../futures-executor", version = "0.2.2", default-features = true } diff --git a/futures-channel/LICENSE-APACHE b/futures-channel/LICENSE-APACHE new file mode 120000 index 0000000000..965b606f33 --- /dev/null +++ b/futures-channel/LICENSE-APACHE @@ -0,0 +1 @@ +../LICENSE-APACHE \ No newline at end of file diff --git a/futures-channel/LICENSE-MIT b/futures-channel/LICENSE-MIT new file mode 120000 index 0000000000..76219eb72e --- /dev/null +++ b/futures-channel/LICENSE-MIT @@ -0,0 +1 @@ +../LICENSE-MIT \ No newline at end of file diff --git a/futures-channel/benches/sync_mpsc.rs b/futures-channel/benches/sync_mpsc.rs old mode 100755 new mode 100644 index 2296d88a20..89c9140e63 --- a/futures-channel/benches/sync_mpsc.rs +++ b/futures-channel/benches/sync_mpsc.rs @@ -114,7 +114,6 @@ impl Stream for TestSender { try_ready!(self.tx.poll_ready(cx).map_err(|_| ())); self.tx.start_send(self.last + 1).unwrap(); self.last += 1; - assert!(self.tx.poll_flush(cx).unwrap().is_ready()); Ok(Async::Ready(Some(self.last))) } } diff --git a/futures-channel/src/lib.rs b/futures-channel/src/lib.rs index 58a54b9b07..c6fa7187ba 100644 --- a/futures-channel/src/lib.rs +++ b/futures-channel/src/lib.rs @@ -4,7 +4,7 @@ //! asynchronous tasks. #![deny(missing_docs, missing_debug_implementations)] -#![doc(html_root_url = "https://docs.rs/futures-channel/0.2.0")] +#![doc(html_root_url = "https://docs.rs/futures-channel/0.2.2")] #![no_std] #[cfg(feature = "std")] diff --git a/futures-channel/src/mpsc/mod.rs b/futures-channel/src/mpsc/mod.rs index 06dd9bbb78..5d75f35df3 100644 --- a/futures-channel/src/mpsc/mod.rs +++ b/futures-channel/src/mpsc/mod.rs @@ -3,20 +3,21 @@ //! //! Similarly to the `std`, channel creation provides [`Receiver`](Receiver) and //! [`Sender`](Sender) handles. [`Receiver`](Receiver) implements -//! [`Stream`](futures_core::Stream) and allows a task to read values out of the +//! [`Stream`] and allows a task to read values out of the //! channel. If there is no message to read from the channel, the current task -//! will be notified when a new value is sent. [`Sender`](Sender) implements the +//! will be awoken when a new value is sent. [`Sender`](Sender) implements the //! `Sink` trait and allows a task to send messages into //! the channel. If the channel is at capacity, the send will be rejected and -//! the task will be notified when additional capacity is available. In other -//! words, the channel provides backpressure. +//! the task will be awoken when additional capacity is available. This process +//! of delaying sends beyond a certain capacity is often referred to as +//! "backpressure". //! -//! Unbounded channels are also available using the [`unbounded`](unbounded) -//! constructor. +//! Unbounded channels (without backpressure) are also available using +//! the [`unbounded`](unbounded) function. //! //! # Disconnection //! -//! When all [`Sender`](Sender) handles have been dropped, it is no longer +//! When all [`Sender`](Sender)s have been dropped, it is no longer //! possible to send values into the channel. This is considered the termination //! event of the stream. As such, [`Receiver::poll_next`](Receiver::poll_next) //! will return `Ok(Ready(None))`. @@ -37,36 +38,37 @@ // At the core, the channel uses an atomic FIFO queue for message passing. This // queue is used as the primary coordination primitive. In order to enforce // capacity limits and handle back pressure, a secondary FIFO queue is used to -// send parked task handles. +// send wakers for blocked `Sender` tasks. // // The general idea is that the channel is created with a `buffer` size of `n`. // The channel capacity is `n + num-senders`. Each sender gets one "guaranteed" // slot to hold a message. This allows `Sender` to know for a fact that a send -// will succeed *before* starting to do the actual work of sending the value. -// Since most of this work is lock-free, once the work starts, it is impossible -// to safely revert. +// can be successfully started *before* beginning to do the actual work of +// sending the value. However, a `send` will not complete until the number of +// messages in the channel has dropped back down below the configured buffer +// size. // -// If the sender is unable to process a send operation, then the current -// task is parked and the handle is sent on the parked task queue. -// -// Note that the implementation guarantees that the channel capacity will never -// exceed the configured limit, however there is no *strict* guarantee that the -// receiver will wake up a parked task *immediately* when a slot becomes -// available. However, it will almost always unpark a task when a slot becomes -// available and it is *guaranteed* that a sender will be unparked when the -// message that caused the sender to become parked is read out of the channel. +// Note that the implementation guarantees that the number of items that have +// finished sending into a channel without being received will not exceed the +// configured buffer size. However, there is no *strict* guarantee that the +// receiver will wake up a blocked `Sender` *immediately* when the buffer size +// drops below the configured limit. However, it will almost always awaken a +// `Sender` when buffer space becomes available, and it is *guaranteed* that a +// `Sender` will be awoken by the time its most recently-sent message is +// popped out of the channel by the `Receiver`. // // The steps for sending a message are roughly: // // 1) Increment the channel message count -// 2) If the channel is at capacity, push the task handle onto the wait queue -// 3) Push the message onto the message queue. +// 2) If the channel is at capacity, push the task's waker onto the wait queue +// 3) Push the message onto the message queue +// 4) If a wakeup was queued, wait for it to occur // // The steps for receiving a message are roughly: // // 1) Pop a message from the message queue -// 2) Pop a task handle from the wait queue -// 3) Decrement the channel message count. +// 2) Pop a task waker from the wait queue +// 3) Decrement the channel message count // // It's important for the order of operations on lock-free structures to happen // in reverse order between the sender and receiver. This makes the message @@ -77,13 +79,13 @@ use std::fmt; use std::error::Error; use std::any::Any; -use std::sync::atomic::AtomicUsize; +use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::atomic::Ordering::SeqCst; use std::sync::{Arc, Mutex}; use std::thread; use std::usize; -use futures_core::task::{self, Waker}; +use futures_core::task::{self, AtomicWaker, Waker}; use futures_core::{Async, Poll, Stream}; use futures_core::never::Never; @@ -102,22 +104,13 @@ pub struct Sender { // Handle to the task that is blocked on this sender. This handle is sent // to the receiver half in order to be notified when the sender becomes // unblocked. - sender_task: Arc>, + sender_waker: Arc>, // True if the sender might be blocked. This is an optimization to avoid // having to lock the mutex most of the time. - maybe_parked: bool, + maybe_blocked: bool, } -/// The transmission end of an unbounded mpsc channel. -/// -/// This value is created by the [`unbounded`](unbounded) function. -#[derive(Debug)] -pub struct UnboundedSender(Sender); - -trait AssertKinds: Send + Sync + Clone {} -impl AssertKinds for UnboundedSender {} - /// The receiving end of a bounded mpsc channel. /// /// This value is created by the [`channel`](channel) function. @@ -126,12 +119,6 @@ pub struct Receiver { inner: Arc>, } -/// The receiving end of an unbounded mpsc channel. -/// -/// This value is created by the [`unbounded`](unbounded) function. -#[derive(Debug)] -pub struct UnboundedReceiver(Receiver); - /// The error type for [`Sender`s](Sender) used as `Sink`s. #[derive(Clone, Debug, PartialEq, Eq)] pub struct SendError { @@ -192,6 +179,10 @@ impl SendError { _ => false, } } + + fn disconnected() -> Self { + SendError { kind: SendErrorKind::Disconnected } + } } impl fmt::Debug for TrySendError { @@ -276,13 +267,13 @@ struct Inner { message_queue: Queue>, // Atomic, FIFO queue used to send parked task handles to the receiver. - parked_queue: Queue>>, + parked_queue: Queue>>, // Number of senders in existence num_senders: AtomicUsize, - // Handle to the receiver's task. - recv_task: Mutex, + // Waker for the receiver's task. + recv_waker: AtomicWaker, } // Struct representation of `Inner::state`. @@ -295,19 +286,6 @@ struct State { num_messages: usize, } -#[derive(Debug)] -struct ReceiverTask { - unparked: bool, - task: Option, -} - -// Returned from Receiver::try_park() -enum TryPark { - Parked, - Closed, - NotEmpty, -} - // The `is_open` flag is stored in the left-most bit of `Inner::state` const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1); @@ -324,24 +302,24 @@ const MAX_BUFFER: usize = MAX_CAPACITY >> 1; // Sent to the consumer to wake up blocked producers #[derive(Debug)] -struct SenderTask { - task: Option, - is_parked: bool, +struct SenderWaker { + waker: Option, + is_blocked: bool, } -impl SenderTask { +impl SenderWaker { fn new() -> Self { - SenderTask { - task: None, - is_parked: false, + SenderWaker { + waker: None, + is_blocked: false, } } - fn notify(&mut self) { - self.is_parked = false; + fn wake(&mut self) { + self.is_blocked = false; - if let Some(task) = self.task.take() { - task.wake(); + if let Some(waker) = self.waker.take() { + waker.wake(); } } } @@ -355,7 +333,7 @@ impl SenderTask { /// `buffer` "first come, first serve" slots available to all senders. /// /// The [`Receiver`](Receiver) returned implements the -/// [`Stream`](futures_core::Stream) trait, while [`Sender`](Sender) implements +/// [`Stream`] trait, while [`Sender`](Sender) implements /// `Sink`. pub fn channel(buffer: usize) -> (Sender, Receiver) { // Check that the requested buffer size does not exceed the maximum buffer @@ -364,20 +342,6 @@ pub fn channel(buffer: usize) -> (Sender, Receiver) { channel2(Some(buffer)) } -/// Creates an unbounded mpsc channel for communicating between asynchronous tasks. -/// -/// A `send` on this channel will always succeed as long as the receive half has -/// not been closed. If the receiver falls behind, messages will be arbitrarily -/// buffered. -/// -/// **Note** that the amount of available system memory is an implicit bound to -/// the channel. Using an `unbounded` channel has the ability of causing the -/// process to run out of memory. In this case, the process will be aborted. -pub fn unbounded() -> (UnboundedSender, UnboundedReceiver) { - let (tx, rx) = channel2(None); - (UnboundedSender(tx), UnboundedReceiver(rx)) -} - fn channel2(buffer: Option) -> (Sender, Receiver) { let inner = Arc::new(Inner { buffer: buffer, @@ -385,16 +349,13 @@ fn channel2(buffer: Option) -> (Sender, Receiver) { message_queue: Queue::new(), parked_queue: Queue::new(), num_senders: AtomicUsize::new(1), - recv_task: Mutex::new(ReceiverTask { - unparked: false, - task: None, - }), + recv_waker: AtomicWaker::new(), }); let tx = Sender { inner: inner.clone(), - sender_task: Arc::new(Mutex::new(SenderTask::new())), - maybe_parked: false, + sender_waker: Arc::new(Mutex::new(SenderWaker::new())), + maybe_blocked: false, }; let rx = Receiver { @@ -475,7 +436,7 @@ impl Sender { self.park(cx); } - self.queue_push_and_signal(Some(msg)); + self.push_msg_and_wake_receiver(Some(msg)); Ok(()) } @@ -506,31 +467,18 @@ impl Sender { }, }; - self.queue_push_and_signal(msg); + self.push_msg_and_wake_receiver(msg); Ok(()) } - fn poll_ready_nb(&self) -> Poll<(), SendError> { - let state = decode_state(self.inner.state.load(SeqCst)); - if state.is_open { - Ok(Async::Ready(())) - } else { - Err(SendError { - kind: SendErrorKind::Full, - }) - } - } - - // Push message to the queue and signal to the receiver - fn queue_push_and_signal(&self, msg: Option) { + fn push_msg_and_wake_receiver(&self, msg: Option) { // Push the message onto the message queue self.inner.message_queue.push(msg); - // Signal to the receiver that a message has been enqueued. If the - // receiver is parked, this will unpark the task. - self.signal(); + // Awaken the reciever task if it was blocked. + self.inner.recv_waker.wake(); } // Increment the number of queued messages. Returns if the sender should @@ -576,54 +524,24 @@ impl Sender { } } - // Signal to the receiver task that a message has been enqueued - fn signal(&self) { - // TODO - // This logic can probably be improved by guarding the lock with an - // atomic. - // - // Do this step first so that the lock is dropped when - // `unpark` is called - let task = { - let mut recv_task = self.inner.recv_task.lock().unwrap(); - - // If the receiver has already been unparked, then there is nothing - // more to do - if recv_task.unparked { - return; - } - - // Setting this flag enables the receiving end to detect that - // an unpark event happened in order to avoid unnecessarily - // parking. - recv_task.unparked = true; - recv_task.task.take() - }; - - if let Some(task) = task { - task.wake(); - } - } - fn park(&mut self, cx: Option<&mut task::Context>) { // TODO: clean up internal state if the task::current will fail - let task = cx.map(|cx| cx.waker().clone()); + let waker = cx.map(|cx| cx.waker().clone()); { - let mut sender = self.sender_task.lock().unwrap(); - sender.task = task; - sender.is_parked = true; + let mut sender = self.sender_waker.lock().unwrap(); + sender.waker = waker; + sender.is_blocked = true; } // Send handle over queue - let t = self.sender_task.clone(); + let t = self.sender_waker.clone(); self.inner.parked_queue.push(t); // Check to make sure we weren't closed after we sent our task on the // queue - let state = decode_state(self.inner.state.load(SeqCst)); - self.maybe_parked = state.is_open; + self.maybe_blocked = !self.is_closed(); } /// Polls the channel to determine if there is guaranteed capacity to send @@ -663,24 +581,24 @@ impl Sender { } fn poll_unparked(&mut self, cx: Option<&mut task::Context>) -> Async<()> { - // First check the `maybe_parked` variable. This avoids acquiring the + // First check the `maybe_blocked` variable. This avoids acquiring the // lock in most cases - if self.maybe_parked { + if self.maybe_blocked { // Get a lock on the task handle - let mut task = self.sender_task.lock().unwrap(); + let mut sender_waker = self.sender_waker.lock().unwrap(); - if !task.is_parked { - self.maybe_parked = false; + if !sender_waker.is_blocked { + self.maybe_blocked = false; return Async::Ready(()) } - // At this point, an unpark request is pending, so there will be an - // unpark sometime in the future. We just need to make sure that + // At this point, an wake request is pending, so there will be an + // wake sometime in the future. We just need to make sure that // the correct task will be notified. // - // Update the task in case the `Sender` has been moved to another + // Update the waker in case the `Sender` has been moved to another // task - task.task = cx.map(|cx| cx.waker().clone()); + sender_waker.waker = cx.map(|cx| cx.waker().clone()); Async::Pending } else { @@ -689,52 +607,6 @@ impl Sender { } } -impl UnboundedSender { - /// Check if the channel is ready to receive a message. - pub fn poll_ready(&self, _: &mut task::Context) -> Poll<(), SendError> { - self.0.poll_ready_nb() - } - - /// Returns whether this channel is closed without needing a context. - pub fn is_closed(&self) -> bool { - self.0.is_closed() - } - - /// Closes this channel from the sender side, preventing any new messages. - pub fn close_channel(&self) { - // There's no need to park this sender, its dropping, - // and we don't want to check for capacity, so skip - // that stuff from `do_send`. - - let _ = self.0.do_send_nb(None); - } - - /// Send a message on the channel. - /// - /// This method should only be called after `poll_ready` has been used to - /// verify that the channel is ready to receive a message. - pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { - self.0.do_send_nb(Some(msg)) - .map_err(|e| e.err) - } - - /// Sends a message along this channel. - /// - /// This is an unbounded sender, so this function differs from `Sink::send` - /// by ensuring the return type reflects that the channel is always ready to - /// receive messages. - pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError> { - self.0.do_send_nb(Some(msg)) - } -} - -impl Clone for UnboundedSender { - fn clone(&self) -> UnboundedSender { - UnboundedSender(self.0.clone()) - } -} - - impl Clone for Sender { fn clone(&self) -> Sender { // Since this atomic op isn't actually guarding any memory and we don't @@ -758,8 +630,8 @@ impl Clone for Sender { if actual == curr { return Sender { inner: self.inner.clone(), - sender_task: Arc::new(Mutex::new(SenderTask::new())), - maybe_parked: false, + sender_waker: Arc::new(Mutex::new(SenderWaker::new())), + maybe_blocked: false, }; } @@ -817,7 +689,7 @@ impl Receiver { loop { match unsafe { self.inner.parked_queue.pop() } { PopResult::Data(task) => { - task.lock().unwrap().notify(); + task.lock().unwrap().wake(); } PopResult::Empty => break, PopResult::Inconsistent => thread::yield_now(), @@ -825,7 +697,7 @@ impl Receiver { } } - /// Tries to receive the next message without notifying a context if empty. + /// Tries to receive the next message without wakeing a context if empty. /// /// It is not recommended to call this function from inside of a future, /// only when you've otherwise arranged to be notified when the channel is @@ -846,7 +718,7 @@ impl Receiver { PopResult::Data(msg) => { // If there are any parked task handles in the parked queue, pop // one and unpark it. - self.unpark_one(); + self.wake_one(); // Decrement number of messages self.dec_num_messages(); @@ -876,11 +748,11 @@ impl Receiver { } // Unpark a single task handle if there is one pending in the parked queue - fn unpark_one(&mut self) { + fn wake_one(&mut self) { loop { match unsafe { self.inner.parked_queue.pop() } { PopResult::Data(task) => { - task.lock().unwrap().notify(); + task.lock().unwrap().wake(); return; } PopResult::Empty => { @@ -895,29 +767,6 @@ impl Receiver { } } - // Try to park the receiver task - fn try_park(&self, cx: &mut task::Context) -> TryPark { - let curr = self.inner.state.load(SeqCst); - let state = decode_state(curr); - - // If the channel is closed, then there is no need to park. - if !state.is_open && state.num_messages == 0 { - return TryPark::Closed; - } - - // First, track the task in the `recv_task` slot - let mut recv_task = self.inner.recv_task.lock().unwrap(); - - if recv_task.unparked { - // Consume the `unpark` signal without actually parking - recv_task.unparked = false; - return TryPark::NotEmpty; - } - - recv_task.task = Some(cx.waker().clone()); - TryPark::Parked - } - fn dec_num_messages(&self) { let mut curr = self.inner.state.load(SeqCst); @@ -933,6 +782,21 @@ impl Receiver { } } } + + fn poll_next_no_register(&mut self) -> Async> { + // Try to read a message off of the message queue. + if let Async::Ready(msg) = self.next_message() { + return Async::Ready(msg); + } + + // Check if the channel is closed. + let state = decode_state(self.inner.state.load(SeqCst)); + if !state.is_open && state.num_messages == 0 { + return Async::Ready(None); + } + + Async::Pending + } } impl Stream for Receiver { @@ -940,37 +804,19 @@ impl Stream for Receiver { type Error = Never; fn poll_next(&mut self, cx: &mut task::Context) -> Poll, Self::Error> { - loop { - // Try to read a message off of the message queue. - let msg = match self.next_message() { - Async::Ready(msg) => msg, - Async::Pending => { - // There are no messages to read, in this case, attempt to - // park. The act of parking will verify that the channel is - // still empty after the park operation has completed. - match self.try_park(cx) { - TryPark::Parked => { - // The task was parked, and the channel is still - // empty, return Pending. - return Ok(Async::Pending); - } - TryPark::Closed => { - // The channel is closed, there will be no further - // messages. - return Ok(Async::Ready(None)); - } - TryPark::NotEmpty => { - // A message has been sent while attempting to - // park. Loop again, the next iteration is - // guaranteed to get the message. - continue; - } - } - } - }; - // Return the message - return Ok(Async::Ready(msg)); + if let Async::Ready(x) = self.poll_next_no_register() { + return Ok(Async::Ready(x)); } + + // Register to receive a wakeup when more messages are sent. + self.inner.recv_waker.register(cx.waker()); + + // Check again for messages just in case one arrived in + // between the call to `next_message` and `register` above. + Ok(self.poll_next_no_register()) + + // The channel is not empty, not closed, and + // we're set to receive a wakeup when a message is sent. } } @@ -984,34 +830,6 @@ impl Drop for Receiver { } } -impl UnboundedReceiver { - /// Closes the receiving half of the channel, without dropping it. - /// - /// This prevents any further messages from being sent on the channel while - /// still enabling the receiver to drain messages that are buffered. - pub fn close(&mut self) { - self.0.close(); - } - - /// Tries to receive the next message without notifying a context if empty. - /// - /// It is not recommended to call this function from inside of a future, - /// only when you've otherwise arranged to be notified when the channel is - /// no longer empty. - pub fn try_next(&mut self) -> Result, TryRecvError> { - self.0.try_next() - } -} - -impl Stream for UnboundedReceiver { - type Item = T; - type Error = Never; - - fn poll_next(&mut self, cx: &mut task::Context) -> Poll, Self::Error> { - self.0.poll_next(cx) - } -} - /* * * ===== impl Inner ===== @@ -1054,3 +872,189 @@ fn encode_state(state: &State) -> usize { num } + +/* + * + * ==== Unbounded channels ==== + * + */ + +/// Creates an unbounded mpsc channel for communicating between asynchronous tasks. +/// +/// A `send` on this channel will always succeed as long as the receive half has +/// not been closed. If the receiver falls behind, messages will be arbitrarily +/// buffered. +/// +/// **Note** that the amount of available system memory is an implicit bound to +/// the channel. Using an `unbounded` channel has the ability of causing the +/// process to run out of memory. In this case, the process will be aborted. +pub fn unbounded() -> (UnboundedSender, UnboundedReceiver) { + let tx = Arc::new(UnboundedInner { + closed: AtomicBool::new(false), + message_queue: Queue::new(), + recv_waker: AtomicWaker::new(), + }); + let rx = tx.clone(); + (UnboundedSender(tx), UnboundedReceiver(rx)) +} + +/// The transmission end of an unbounded mpsc channel. +/// +/// This value is created by the [`unbounded`](unbounded) function. +#[derive(Debug, Clone)] +pub struct UnboundedSender(Arc>); + +/// The receiving end of an unbounded mpsc channel. +/// +/// This value is created by the [`unbounded`](unbounded) function. +#[derive(Debug)] +pub struct UnboundedReceiver(Arc>); + +trait AssertKinds: Send + Sync + Clone {} +impl AssertKinds for UnboundedSender {} + +#[derive(Debug)] +struct UnboundedInner { + closed: AtomicBool, + message_queue: Queue, + recv_waker: AtomicWaker, +} + +impl UnboundedSender { + /// Check if the channel is ready to receive a message. + pub fn poll_ready(&self, _: &mut task::Context) -> Poll<(), SendError> { + Ok(Async::Ready(())) + } + + /// Returns whether this channel is closed without needing a context. + pub fn is_closed(&self) -> bool { + self.0.closed.load(SeqCst) + } + + /// Closes this channel from the sender side, preventing any new messages. + pub fn close_channel(&self) { + self.0.closed.store(true, SeqCst); + self.0.recv_waker.wake(); + } + + /// Send a message on the channel. + /// + /// This method should only be called after `poll_ready` has been used to + /// verify that the channel is ready to receive a message. + pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { + if self.0.closed.load(SeqCst) { + return Err(SendError::disconnected()); + } + self.0.message_queue.push(msg); + self.0.recv_waker.wake(); + Ok(()) + } + + /// Sends a message along this channel. + /// + /// This is an unbounded sender, so this function differs from `Sink::send` + /// by ensuring the return type reflects that the channel is always ready to + /// receive messages. + pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError> { + // TODO there's a race between checking the `closed` atomicbool + // and pushing onto the queue. + if self.0.closed.load(SeqCst) { + return Err(TrySendError { + err: SendError::disconnected(), + val: msg, + }); + } + self.0.message_queue.push(msg); + self.0.recv_waker.wake(); + Ok(()) + } +} + +impl Drop for UnboundedSender { + fn drop(&mut self) { + if Arc::strong_count(&self.0) == 2 { + // If it's just us and the reciever, or us and another sender, + // the channel should be closed. + self.0.closed.store(true, SeqCst); + self.0.recv_waker.wake(); + } + } +} + +impl UnboundedReceiver { + /// Closes the receiving half of the channel, without dropping it. + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + self.0.closed.store(true, SeqCst); + } + + /// Tries to receive the next message without notifying a context if empty. + /// + /// It is not recommended to call this function from inside of a future, + /// only when you've otherwise arranged to be notified when the channel is + /// no longer empty. + pub fn try_next(&mut self) -> Result, TryRecvError> { + loop { + // Safe because this is the only place the message queue is popped, + // and it takes `&mut self` to ensure that only the unique reciever + // can pop off of the message queue. + match unsafe { self.0.message_queue.pop() } { + PopResult::Data(msg) => return Ok(Some(msg)), + PopResult::Empty => { + if self.0.closed.load(SeqCst) { + // Ensure that the `closed` state wasn't written after + // a final message was sent. + match unsafe { self.0.message_queue.pop() } { + PopResult::Data(msg) => return Ok(Some(msg)), + PopResult::Empty => return Ok(None), + PopResult::Inconsistent => { + thread::yield_now(); + continue; + } + } + } + return Err(TryRecvError { _inner: () }); + } + PopResult::Inconsistent => { + // Inconsistent means that there will be a message to pop + // in a short time. This branch can only be reached if + // values are being produced from another thread, so there + // are a few ways that we can deal with this: + // + // 1) Spin + // 2) thread::yield_now() + // 3) task::current().unwrap() & return Pending + // + // For now, thread::yield_now() is used, but it would + // probably be better to spin a few times then yield. + thread::yield_now(); + } + } + } + } +} + +impl Stream for UnboundedReceiver { + type Item = T; + type Error = Never; + + fn poll_next(&mut self, cx: &mut task::Context) -> Poll, Self::Error> { + if let Ok(msg) = self.try_next() { + return Ok(Async::Ready(msg)); + } + self.0.recv_waker.register(cx.waker()); + if let Ok(msg) = self.try_next() { + return Ok(Async::Ready(msg)); + } + Ok(Async::Pending) + } +} + +impl Drop for UnboundedReceiver { + fn drop(&mut self) { + self.0.closed.store(true, SeqCst); + } +} + diff --git a/futures-channel/src/mpsc/queue.rs b/futures-channel/src/mpsc/queue.rs index 0c4d7953e4..15af832b49 100644 --- a/futures-channel/src/mpsc/queue.rs +++ b/futures-channel/src/mpsc/queue.rs @@ -80,7 +80,7 @@ unsafe impl Send for Queue { } unsafe impl Sync for Queue { } impl Node { - unsafe fn new(v: Option) -> *mut Node { + fn new(v: Option) -> *mut Node { Box::into_raw(Box::new(Node { next: AtomicPtr::new(ptr::null_mut()), value: v, @@ -92,7 +92,7 @@ impl Queue { /// Creates a new queue that is safe to share among multiple producers and /// one consumer. pub fn new() -> Queue { - let stub = unsafe { Node::new(None) }; + let stub = Node::new(None); Queue { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub), diff --git a/futures-channel/tests/mpsc.rs b/futures-channel/tests/mpsc.rs index 9d7395a070..a3fdba69d0 100644 --- a/futures-channel/tests/mpsc.rs +++ b/futures-channel/tests/mpsc.rs @@ -12,7 +12,7 @@ use futures::future::poll_fn; use futures::task; use futures_channel::mpsc; use futures_channel::oneshot; -use futures_executor::block_on; +use futures_executor::{block_on, block_on_stream}; trait AssertSend: Send {} impl AssertSend for mpsc::Sender {} @@ -94,20 +94,15 @@ fn send_recv_threads() { #[test] fn send_recv_threads_no_capacity() { let (tx, rx) = mpsc::channel::(0); + let mut rx = block_on_stream(rx); - let (readytx, readyrx) = mpsc::channel::<()>(2); let t = thread::spawn(move|| { - let readytx = readytx.sink_map_err(|_| panic!()); - let (a, b) = block_on(tx.send(1).join(readytx.send(()))).unwrap(); - block_on(a.send(2).join(b.send(()))).unwrap(); + let tx = block_on(tx.send(1)).unwrap(); + block_on(tx.send(2)).unwrap(); }); - let readyrx = block_on(readyrx.next()).ok().unwrap().1; - let (item, rx) = block_on(rx.next()).ok().unwrap(); - assert_eq!(item, Some(1)); - drop(block_on(readyrx.next()).ok().unwrap()); - let item = block_on(rx.next()).ok().unwrap().0; - assert_eq!(item, Some(2)); + assert_eq!(rx.next(), Some(Ok(1))); + assert_eq!(rx.next(), Some(Ok(2))); t.join().unwrap(); } @@ -401,7 +396,8 @@ fn stress_close_receiver_iter() { Some(r) => assert!(i == r), None => { let unwritten = unwritten_rx.recv().expect("unwritten_rx"); - assert_eq!(unwritten, i); + assert!(unwritten <= i + 1); + assert!(unwritten >= i); th.join().unwrap(); return; } diff --git a/futures-core/Cargo.toml b/futures-core/Cargo.toml index f9bafe6811..370f341379 100644 --- a/futures-core/Cargo.toml +++ b/futures-core/Cargo.toml @@ -1,6 +1,6 @@ [package] -name = "futures-core" -version = "0.2.0" +name = "futures-core-preview" +version = "0.2.3" authors = ["Alex Crichton "] license = "MIT/Apache-2.0" repository = "https://github.com/rust-lang-nursery/futures-rs" @@ -10,6 +10,9 @@ description = """ The core traits and types in for the `futures` library. """ +[lib] +name = "futures_core" + [features] default = ["std"] std = ["either/use_std"] diff --git a/futures-core/LICENSE-APACHE b/futures-core/LICENSE-APACHE new file mode 120000 index 0000000000..965b606f33 --- /dev/null +++ b/futures-core/LICENSE-APACHE @@ -0,0 +1 @@ +../LICENSE-APACHE \ No newline at end of file diff --git a/futures-core/LICENSE-MIT b/futures-core/LICENSE-MIT new file mode 120000 index 0000000000..76219eb72e --- /dev/null +++ b/futures-core/LICENSE-MIT @@ -0,0 +1 @@ +../LICENSE-MIT \ No newline at end of file diff --git a/futures-core/src/future/mod.rs b/futures-core/src/future/mod.rs index f722944ebd..ab6233b719 100644 --- a/futures-core/src/future/mod.rs +++ b/futures-core/src/future/mod.rs @@ -147,7 +147,7 @@ if_std! { type Error = F::Error; fn poll(&mut self, cx: &mut task::Context) -> Poll { - unsafe { ::core::mem::Pin::get_mut(&mut self.as_pin()).poll(cx) } + unsafe { ::core::mem::PinMut::get_mut_unchecked(self.as_pin_mut()).poll(cx) } } } diff --git a/futures-core/src/lib.rs b/futures-core/src/lib.rs index e51ae48dd9..71ff16ca98 100644 --- a/futures-core/src/lib.rs +++ b/futures-core/src/lib.rs @@ -2,7 +2,7 @@ #![no_std] #![deny(missing_docs, missing_debug_implementations, warnings)] -#![doc(html_root_url = "https://docs.rs/futures-core/0.2.0")] +#![doc(html_root_url = "https://docs.rs/futures-core/0.2.2")] #![cfg_attr(feature = "nightly", feature(cfg_target_has_atomic))] #![cfg_attr(feature = "nightly", feature(pin))] diff --git a/futures-core/src/stream/mod.rs b/futures-core/src/stream/mod.rs index 9592b02d03..8305272f78 100644 --- a/futures-core/src/stream/mod.rs +++ b/futures-core/src/stream/mod.rs @@ -89,7 +89,7 @@ if_std! { type Error = S::Error; fn poll_next(&mut self, cx: &mut task::Context) -> Poll, Self::Error> { - unsafe { ::core::mem::Pin::get_mut(&mut self.as_pin()).poll_next(cx) } + unsafe { ::core::mem::PinMut::get_mut_unchecked(self.as_pin_mut()).poll_next(cx) } } } diff --git a/futures-core/src/task/atomic_waker.rs b/futures-core/src/task/atomic_waker.rs old mode 100755 new mode 100644 diff --git a/futures-core/src/task/mod.rs b/futures-core/src/task/mod.rs index 7a7dfb4dc0..bfbd3c6cc1 100644 --- a/futures-core/src/task/mod.rs +++ b/futures-core/src/task/mod.rs @@ -25,9 +25,15 @@ mod data { } -#[cfg_attr(feature = "nightly", cfg(target_has_atomic = "ptr"))] +#[cfg_attr( + feature = "nightly", + cfg(all(target_has_atomic = "cas", target_has_atomic = "ptr")) +)] mod atomic_waker; -#[cfg_attr(feature = "nightly", cfg(target_has_atomic = "ptr"))] +#[cfg_attr( + feature = "nightly", + cfg(all(target_has_atomic = "cas", target_has_atomic = "ptr")) +)] pub use self::atomic_waker::AtomicWaker; /// A map storing task-local data. diff --git a/futures-executor/Cargo.toml b/futures-executor/Cargo.toml index 36ef8c3dce..fb361e0eb3 100644 --- a/futures-executor/Cargo.toml +++ b/futures-executor/Cargo.toml @@ -1,6 +1,6 @@ [package] -name = "futures-executor" -version = "0.2.0" +name = "futures-executor-preview" +version = "0.2.2" authors = ["Alex Crichton "] license = "MIT/Apache-2.0" repository = "https://github.com/rust-lang-nursery/futures-rs" @@ -10,17 +10,20 @@ description = """ Executors for asynchronous tasks based on the futures-rs library. """ +[lib] +name = "futures_executor" + [features] -std = ["num_cpus", "futures-core/std", "futures-util/std", "futures-channel/std", "lazy_static"] +std = ["num_cpus", "futures-core-preview/std", "futures-util-preview/std", "futures-channel-preview/std", "lazy_static"] default = ["std"] [dependencies] -futures-core = { path = "../futures-core", version = "0.2.0", default-features = false} -futures-util = { path = "../futures-util", version = "0.2.0", default-features = false} -futures-channel = { path = "../futures-channel", version = "0.2.0", default-features = false} +futures-core-preview = { path = "../futures-core", version = "0.2.2", default-features = false} +futures-util-preview = { path = "../futures-util", version = "0.2.2", default-features = false} +futures-channel-preview = { path = "../futures-channel", version = "0.2.2", default-features = false} num_cpus = { version = "1.0", optional = true } lazy_static = { version = "1.0", optional = true } [dev-dependencies] -futures = { path = "../futures", version = "0.2.0" } -futures-channel = { path = "../futures-channel", version = "0.2.0" } +futures-preview = { path = "../futures", version = "0.2.2" } +futures-channel-preview = { path = "../futures-channel", version = "0.2.2" } diff --git a/futures-executor/LICENSE-APACHE b/futures-executor/LICENSE-APACHE new file mode 120000 index 0000000000..965b606f33 --- /dev/null +++ b/futures-executor/LICENSE-APACHE @@ -0,0 +1 @@ +../LICENSE-APACHE \ No newline at end of file diff --git a/futures-executor/LICENSE-MIT b/futures-executor/LICENSE-MIT new file mode 120000 index 0000000000..76219eb72e --- /dev/null +++ b/futures-executor/LICENSE-MIT @@ -0,0 +1 @@ +../LICENSE-MIT \ No newline at end of file diff --git a/futures-executor/benches/poll.rs b/futures-executor/benches/poll.rs old mode 100755 new mode 100644 diff --git a/futures-executor/benches/thread_notify.rs b/futures-executor/benches/thread_notify.rs old mode 100755 new mode 100644 diff --git a/futures-executor/src/enter.rs b/futures-executor/src/enter.rs index b51a5ce68b..2bde0000a7 100644 --- a/futures-executor/src/enter.rs +++ b/futures-executor/src/enter.rs @@ -6,7 +6,7 @@ thread_local!(static ENTERED: Cell = Cell::new(false)); /// Represents an executor context. /// -/// For more details, see [`enter` documentation](::enter()) +/// For more details, see [`enter`'s documentation](enter()) pub struct Enter { _a: () } @@ -22,7 +22,7 @@ pub struct EnterError { /// executor. /// /// Executor implementations should call this function before beginning to -/// execute a tasks, and drop the returned [`Enter`](Enter) value after +/// execute tasks and drop the returned [`Enter`](Enter) value after /// completing task execution: /// /// ```rust diff --git a/futures-executor/src/lib.rs b/futures-executor/src/lib.rs index b76aa237c8..17d8b0750f 100644 --- a/futures-executor/src/lib.rs +++ b/futures-executor/src/lib.rs @@ -2,7 +2,7 @@ #![no_std] #![deny(missing_docs)] -#![doc(html_root_url = "https://docs.rs/futures-executor/0.2.0")] +#![doc(html_root_url = "https://docs.rs/futures-executor/0.2.2")] #[cfg(feature = "std")] #[macro_use] diff --git a/futures-executor/src/local_pool.rs b/futures-executor/src/local_pool.rs index e5cf80f447..dcc4317325 100644 --- a/futures-executor/src/local_pool.rs +++ b/futures-executor/src/local_pool.rs @@ -209,7 +209,7 @@ pub fn block_on(f: F) -> Result { /// Turn a stream into a blocking iterator. /// -/// Whne `next` is called on the resulting `BlockingStream`, the caller +/// When `next` is called on the resulting `BlockingStream`, the caller /// will be blocked until the next element of the `Stream` becomes available. /// The default executor for the future is a global `ThreadPool`. pub fn block_on_stream(s: S) -> BlockingStream { diff --git a/futures-executor/tests/local_pool.rs b/futures-executor/tests/local_pool.rs old mode 100755 new mode 100644 diff --git a/futures-io/Cargo.toml b/futures-io/Cargo.toml index 6c029d6fa9..a295f6f754 100644 --- a/futures-io/Cargo.toml +++ b/futures-io/Cargo.toml @@ -1,6 +1,6 @@ [package] -name = "futures-io" -version = "0.2.0" +name = "futures-io-preview" +version = "0.2.2" authors = ["Alex Crichton "] license = "MIT/Apache-2.0" repository = "https://github.com/rust-lang-nursery/futures-rs" @@ -10,13 +10,16 @@ description = """ The `AsyncRead` and `AsyncWrite` traits for the futures-rs library. """ +[lib] +name = "futures_io" + [features] -std = ["futures-core/std", "iovec"] +std = ["futures-core-preview/std", "iovec"] default = ["std"] [dependencies] -futures-core = { path = "../futures-core", version = "0.2.0", default-features = false } +futures-core-preview = { path = "../futures-core", version = "0.2.2", default-features = false } iovec = { version = "0.1", optional = true } [dev-dependencies] -futures = { path = "../futures", version = "0.2.0" } +futures-preview = { path = "../futures", version = "0.2.2" } diff --git a/futures-io/LICENSE-APACHE b/futures-io/LICENSE-APACHE new file mode 120000 index 0000000000..965b606f33 --- /dev/null +++ b/futures-io/LICENSE-APACHE @@ -0,0 +1 @@ +../LICENSE-APACHE \ No newline at end of file diff --git a/futures-io/LICENSE-MIT b/futures-io/LICENSE-MIT new file mode 120000 index 0000000000..76219eb72e --- /dev/null +++ b/futures-io/LICENSE-MIT @@ -0,0 +1 @@ +../LICENSE-MIT \ No newline at end of file diff --git a/futures-io/src/lib.rs b/futures-io/src/lib.rs index 1834ca1358..97e5adfa94 100644 --- a/futures-io/src/lib.rs +++ b/futures-io/src/lib.rs @@ -6,7 +6,7 @@ #![no_std] #![deny(missing_docs, missing_debug_implementations)] -#![doc(html_rnoot_url = "https://docs.rs/futures-io/0.2.0")] +#![doc(html_rnoot_url = "https://docs.rs/futures-io/0.2.2")] macro_rules! if_std { ($($i:item)*) => ($( diff --git a/futures-macro-async/Cargo.toml b/futures-macro-async/Cargo.toml index 996c0ae2af..f094bfdfc9 100644 --- a/futures-macro-async/Cargo.toml +++ b/futures-macro-async/Cargo.toml @@ -1,6 +1,6 @@ [package] -name = "futures-macro-async" -version = "0.2.0" +name = "futures-macro-async-preview" +version = "0.2.2" authors = ["Alex Crichton "] license = "MIT/Apache-2.0" repository = "https://github.com/rust-lang-nursery/futures-rs" @@ -10,14 +10,15 @@ few other assorted macros. """ [lib] +name = "futures_macro_async" proc-macro = true [dependencies] -quote = "0.5" -proc-macro2 = "0.3" +quote = "0.6" +proc-macro2 = "0.4.4" [dependencies.syn] -version = "0.13" +version = "0.14" features = ["full", "fold", "parsing", "printing", "extra-traits", "proc-macro"] default-features = false diff --git a/futures-macro-async/LICENSE-APACHE b/futures-macro-async/LICENSE-APACHE new file mode 120000 index 0000000000..965b606f33 --- /dev/null +++ b/futures-macro-async/LICENSE-APACHE @@ -0,0 +1 @@ +../LICENSE-APACHE \ No newline at end of file diff --git a/futures-macro-async/LICENSE-MIT b/futures-macro-async/LICENSE-MIT new file mode 120000 index 0000000000..76219eb72e --- /dev/null +++ b/futures-macro-async/LICENSE-MIT @@ -0,0 +1 @@ +../LICENSE-MIT \ No newline at end of file diff --git a/futures-macro-async/src/lib.rs b/futures-macro-async/src/lib.rs index b88a23c1c3..5446910b4a 100644 --- a/futures-macro-async/src/lib.rs +++ b/futures-macro-async/src/lib.rs @@ -9,7 +9,6 @@ //! Currently this crate depends on `syn` and `quote` to do all the heavy //! lifting, this is just a very small shim around creating a closure/future out //! of a generator. -#![cfg_attr(feature = "nightly", feature(proc_macro))] #![recursion_limit = "128"] @@ -28,9 +27,9 @@ if_nightly! { #[macro_use] extern crate syn; - use proc_macro2::Span; + use proc_macro2::{Ident, Span}; use proc_macro::{Delimiter, Group, TokenStream, TokenTree}; - use quote::{Tokens, ToTokens}; + use quote::ToTokens; use syn::*; use syn::punctuated::Punctuated; use syn::fold::Fold; @@ -47,7 +46,7 @@ if_nightly! { fn async_inner( attribute: Attribute, function: TokenStream, - gen_function: Tokens, + gen_function: proc_macro2::TokenStream, return_ty: F, ) -> TokenStream where F: FnOnce(&Type, &[&Lifetime]) -> proc_macro2::TokenStream @@ -144,7 +143,7 @@ if_nightly! { // `ref a: B` (or some similar pattern) FnArg::Captured(ArgCaptured { pat, ty, colon_token }) => { patterns.push(pat); - let ident = Ident::from(format!("__arg_{}", i)); + let ident = Ident::new(&format!("__arg_{}", i), Span::call_site()); temp_bindings.push(ident.clone()); let pat = PatIdent { by_ref: None, @@ -187,7 +186,7 @@ if_nightly! { #( let #patterns = #temp_bindings; )* #block }; - let mut result = Tokens::new(); + let mut result = proc_macro2::TokenStream::new(); block.brace_token.surround(&mut result, |tokens| { block_inner.to_tokens(tokens); }); @@ -204,7 +203,7 @@ if_nightly! { loop { yield ::futures::__rt::Async::Pending } } }; - let mut gen_body = Tokens::new(); + let mut gen_body = proc_macro2::TokenStream::new(); block.brace_token.surround(&mut gen_body, |tokens| { gen_body_inner.to_tokens(tokens); }); @@ -228,7 +227,7 @@ if_nightly! { body_inner.into() }; - let mut body = Tokens::new(); + let mut body = proc_macro2::TokenStream::new(); block.brace_token.surround(&mut body, |tokens| { body_inner.to_tokens(tokens); }); @@ -253,7 +252,7 @@ if_nightly! { let args = syn::parse::(attribute) .expect(&format!("failed to parse attribute arguments: {}", attr)); - let attribute = Attribute::from(args.0.into_iter().map(|arg| arg.0)); + let attribute = Attribute::from(args.0.into_iter().map(|arg| arg.0.to_string())); async_inner(attribute, function, quote_cs! { ::futures::__rt::gen_future }, |output, lifetimes| { // TODO: can we lift the restriction that `futures` must be at the root of @@ -296,8 +295,8 @@ if_nightly! { let mut item_ty = None; - for (term, ty) in valued_args { - match term.as_ref() { + for (ident, ty) in valued_args { + match ident.to_string().as_str() { "item" => { if item_ty.is_some() { panic!("duplicate 'item' argument"); @@ -305,13 +304,13 @@ if_nightly! { item_ty = Some(ty); } _ => { - panic!("unexpected macro argument '{}'", quote_cs!(#term = #ty)); + panic!("unexpected macro argument '{}'", quote_cs!(#ident = #ty)); } } } let item_ty = item_ty.expect("#[async_stream] requires item type to be specified"); - let attribute = Attribute::from(args); + let attribute = Attribute::from(args.map(|arg| arg.to_string())); async_inner(attribute, function, quote_cs! { ::futures::__rt::gen_stream }, |output, lifetimes| { let return_ty = match attribute { @@ -433,7 +432,7 @@ if_nightly! { let #pat = { let r = { let pin = unsafe { - ::futures::__rt::std::mem::Pin::new_unchecked(&mut __stream) + ::futures::__rt::std::mem::PinMut::new_unchecked(&mut __stream) }; ::futures::__rt::in_ctx(|ctx| ::futures::__rt::StableStream::poll_next(pin, ctx)) }; @@ -464,7 +463,7 @@ if_nightly! { } fn first_last(tokens: &ToTokens) -> (Span, Span) { - let mut spans = Tokens::new(); + let mut spans = proc_macro2::TokenStream::new(); tokens.to_tokens(&mut spans); let good_tokens = proc_macro2::TokenStream::from(spans).into_iter().collect::>(); let first_span = good_tokens.first().map(|t| t.span()).unwrap_or(Span::call_site()); @@ -487,10 +486,12 @@ if_nightly! { fn replace_bang(input: proc_macro2::TokenStream, tokens: &ToTokens) -> proc_macro2::TokenStream { - let mut new_tokens = Tokens::new(); + let mut new_tokens = proc_macro2::TokenStream::new(); for token in input.into_iter() { match token { - proc_macro2::TokenTree::Op(op) if op.op() == '!' => tokens.to_tokens(&mut new_tokens), + proc_macro2::TokenTree::Punct(ref punct) if punct.as_char() == '!' => { + tokens.to_tokens(&mut new_tokens) + }, _ => token.to_tokens(&mut new_tokens), } } @@ -501,10 +502,10 @@ if_nightly! { -> proc_macro2::TokenStream { let mut replacements = replacements.iter().cycle(); - let mut new_tokens = Tokens::new(); + let mut new_tokens = proc_macro2::TokenStream::new(); for token in input.into_iter() { match token { - proc_macro2::TokenTree::Op(op) if op.op() == '!' => { + proc_macro2::TokenTree::Punct(ref punct) if punct.as_char() == '!' => { replacements.next().unwrap().to_tokens(&mut new_tokens); } _ => token.to_tokens(&mut new_tokens), @@ -525,8 +526,8 @@ if_nightly! { impl synom::Synom for AsyncArgs { named!(parse -> Self, map!( - option!(parens!(call!(Punctuated::::parse_separated_nonempty))), - |p| AsyncArgs(p.map(|d| d.1.into_iter().collect()).unwrap_or_default()) + call!(Punctuated::::parse_separated), + |p| AsyncArgs(p.into_iter().collect()) )); } @@ -546,8 +547,8 @@ if_nightly! { impl synom::Synom for AsyncStreamArgs { named!(parse -> Self, map!( - option!(parens!(call!(Punctuated::::parse_separated_nonempty))), - |p| AsyncStreamArgs(p.map(|d| d.1.into_iter().collect()).unwrap_or_default()) + call!(Punctuated::::parse_separated), + |p| AsyncStreamArgs(p.into_iter().collect()) )); } } diff --git a/futures-macro-await/Cargo.toml b/futures-macro-await/Cargo.toml index 205aea19b7..4e5080f035 100644 --- a/futures-macro-await/Cargo.toml +++ b/futures-macro-await/Cargo.toml @@ -1,6 +1,6 @@ [package] -name = "futures-macro-await" -version = "0.2.0" +name = "futures-macro-await-preview" +version = "0.2.2" authors = ["Alex Crichton "] license = "MIT/Apache-2.0" repository = "https://github.com/rust-lang-nursery/futures-rs" @@ -8,8 +8,11 @@ description = """ Definition of the `await!` macro for the `futures` crate. """ +[lib] +name = "futures_macro_await" + [dev-dependencies] -futures = { path = "../futures" } +futures-preview = { path = "../futures" } [features] -nightly = ["futures/nightly"] +nightly = ["futures-preview/nightly"] diff --git a/futures-macro-await/LICENSE-APACHE b/futures-macro-await/LICENSE-APACHE new file mode 120000 index 0000000000..965b606f33 --- /dev/null +++ b/futures-macro-await/LICENSE-APACHE @@ -0,0 +1 @@ +../LICENSE-APACHE \ No newline at end of file diff --git a/futures-macro-await/LICENSE-MIT b/futures-macro-await/LICENSE-MIT new file mode 120000 index 0000000000..76219eb72e --- /dev/null +++ b/futures-macro-await/LICENSE-MIT @@ -0,0 +1 @@ +../LICENSE-MIT \ No newline at end of file diff --git a/futures-macro-await/src/lib.rs b/futures-macro-await/src/lib.rs index ffef6d1b06..da1aaf75a4 100644 --- a/futures-macro-await/src/lib.rs +++ b/futures-macro-await/src/lib.rs @@ -16,10 +16,11 @@ /// #[cfg_attr(feature = "nightly", doc = "```")] #[cfg_attr(not(feature = "nightly"), doc = "```ignore")] -/// #![feature(proc_macro, generators, pin)] +/// #![feature(use_extern_macros, generators, pin)] /// extern crate futures; /// /// use futures::prelude::*; +/// use futures::prelude::await; /// use futures::future; /// use futures::stable::block_on_stable; /// @@ -39,12 +40,12 @@ macro_rules! await { let future = &mut future; // The above borrow is necessary to force a borrow across a // yield point, proving that we're currently in an immovable - // generator, making the below `Pin::new_unchecked` call + // generator, making the below `PinMut::new_unchecked` call // safe. loop { let poll = ::futures::__rt::in_ctx(|ctx| { let pin = unsafe { - ::futures::__rt::std::mem::Pin::new_unchecked(future) + ::futures::__rt::std::mem::PinMut::new_unchecked(future) }; ::futures::__rt::StableFuture::poll(pin, ctx) }); @@ -77,10 +78,11 @@ macro_rules! await { /// #[cfg_attr(feature = "nightly", doc = "```")] #[cfg_attr(not(feature = "nightly"), doc = "```ignore")] -/// #![feature(proc_macro, generators, pin)] +/// #![feature(use_extern_macros, generators, pin)] /// extern crate futures; /// /// use futures::prelude::*; +/// use futures::prelude::async; /// use futures::stream; /// use futures::stable::block_on_stable; /// @@ -126,10 +128,11 @@ macro_rules! await_item { /// #[cfg_attr(feature = "nightly", doc = "```")] #[cfg_attr(not(feature = "nightly"), doc = "```ignore")] -/// #![feature(proc_macro, generators, pin)] +/// #![feature(use_extern_macros, generators, pin)] /// extern crate futures; /// /// use futures::prelude::*; +/// use futures::prelude::await; /// use futures::stream; /// use futures::executor::block_on; /// diff --git a/futures-sink/Cargo.toml b/futures-sink/Cargo.toml index 72cfd79e24..8316851b5f 100644 --- a/futures-sink/Cargo.toml +++ b/futures-sink/Cargo.toml @@ -1,6 +1,6 @@ [package] -name = "futures-sink" -version = "0.2.0" +name = "futures-sink-preview" +version = "0.2.2" authors = ["Alex Crichton "] license = "MIT/Apache-2.0" repository = "https://github.com/rust-lang-nursery/futures-rs" @@ -10,11 +10,14 @@ description = """ The asynchronous `Sink` trait for the futures-rs library. """ +[lib] +name = "futures_sink" + [features] -std = ["either/use_std", "futures-core/std", "futures-channel/std"] +std = ["either/use_std", "futures-core-preview/std", "futures-channel-preview/std"] default = ["std"] [dependencies] either = { version = "1.4", default-features = false, optional = true } -futures-core = { path = "../futures-core", version = "0.2.0", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.2.0", default-features = false } +futures-core-preview = { path = "../futures-core", version = "0.2.2", default-features = false } +futures-channel-preview = { path = "../futures-channel", version = "0.2.2", default-features = false } diff --git a/futures-sink/LICENSE-APACHE b/futures-sink/LICENSE-APACHE new file mode 120000 index 0000000000..965b606f33 --- /dev/null +++ b/futures-sink/LICENSE-APACHE @@ -0,0 +1 @@ +../LICENSE-APACHE \ No newline at end of file diff --git a/futures-sink/LICENSE-MIT b/futures-sink/LICENSE-MIT new file mode 120000 index 0000000000..76219eb72e --- /dev/null +++ b/futures-sink/LICENSE-MIT @@ -0,0 +1 @@ +../LICENSE-MIT \ No newline at end of file diff --git a/futures-sink/src/channel_impls.rs b/futures-sink/src/channel_impls.rs index f79c4b6bab..2aec273bb4 100644 --- a/futures-sink/src/channel_impls.rs +++ b/futures-sink/src/channel_impls.rs @@ -14,8 +14,14 @@ impl Sink for Sender { self.start_send(msg) } - fn poll_flush(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> { - Ok(Async::Ready(())) + fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> { + match self.poll_ready(cx) { + Err(ref e) if e.is_disconnected() => { + // If the receiver disconnected, we consider the sink to be flushed. + Ok(Async::Ready(())) + } + x => x, + } } fn poll_close(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> { diff --git a/futures-sink/src/lib.rs b/futures-sink/src/lib.rs index 0da8a92215..96bbadb896 100644 --- a/futures-sink/src/lib.rs +++ b/futures-sink/src/lib.rs @@ -5,7 +5,7 @@ #![no_std] #![deny(missing_docs, missing_debug_implementations)] -#![doc(html_root_url = "https://docs.rs/futures-sink/0.2.0")] +#![doc(html_root_url = "https://docs.rs/futures-sink/0.2.2")] #[cfg(feature = "std")] extern crate std; @@ -185,7 +185,7 @@ pub trait Sink { fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError>; /// Begin the process of sending a value to the sink. - /// Each call to this function must be proceeded by a successful call to + /// Each call to this function must be preceded by a successful call to /// `poll_ready` which returned `Ok(Async::Ready(()))`. /// /// As the name suggests, this method only *begins* the process of sending diff --git a/futures-stable/Cargo.toml b/futures-stable/Cargo.toml index b71a5b9412..23e028d852 100644 --- a/futures-stable/Cargo.toml +++ b/futures-stable/Cargo.toml @@ -1,22 +1,25 @@ [package] -name = "futures-stable" +name = "futures-stable-preview" description = "futures which support internal references" -version = "0.2.0" +version = "0.2.3" authors = ["boats "] license = "MIT OR Apache-2.0" repository = "https://github.com/rust-lang-nursery/futures-rs" -[dependencies.futures-core] +[lib] +name = "futures_stable" + +[dependencies.futures-core-preview] path = "../futures-core" -version = "0.2.0" +version = "0.2.2" default-features = false -[dependencies.futures-executor] +[dependencies.futures-executor-preview] path = "../futures-executor" -version = "0.2.0" +version = "0.2.2" default-features = false [features] -nightly = ["futures-core/nightly"] -std = ["futures-core/std", "futures-executor/std"] +nightly = ["futures-core-preview/nightly"] +std = ["futures-core-preview/std", "futures-executor-preview/std"] default = ["std"] diff --git a/futures-stable/LICENSE-APACHE b/futures-stable/LICENSE-APACHE new file mode 120000 index 0000000000..965b606f33 --- /dev/null +++ b/futures-stable/LICENSE-APACHE @@ -0,0 +1 @@ +../LICENSE-APACHE \ No newline at end of file diff --git a/futures-stable/LICENSE-MIT b/futures-stable/LICENSE-MIT new file mode 120000 index 0000000000..76219eb72e --- /dev/null +++ b/futures-stable/LICENSE-MIT @@ -0,0 +1 @@ +../LICENSE-MIT \ No newline at end of file diff --git a/futures-stable/src/executor.rs b/futures-stable/src/executor.rs index 919cf78a1f..1b85995be4 100644 --- a/futures-stable/src/executor.rs +++ b/futures-stable/src/executor.rs @@ -7,7 +7,11 @@ use futures_executor::{ThreadPool, LocalPool, LocalExecutor}; use StableFuture; use UnsafePin; +/// Executors which can spawn `PinBox`ed futures. +/// +/// These executors can be used in combination with `StableFuture`s. pub trait StableExecutor: Executor { + /// Spawn a `PinBox` future. fn spawn_pinned(&mut self, f: PinBox + Send>) -> Result<(), SpawnError>; } @@ -23,6 +27,7 @@ impl StableExecutor for LocalExecutor { } } +/// Block on the result of a `StableFuture`. pub fn block_on_stable(f: F) -> Result { let mut pool = LocalPool::new(); let mut exec = pool.executor(); diff --git a/futures-stable/src/lib.rs b/futures-stable/src/lib.rs index 730baa721f..b52469bcb0 100644 --- a/futures-stable/src/lib.rs +++ b/futures-stable/src/lib.rs @@ -20,7 +20,7 @@ if_nightly! { extern crate futures_core; extern crate futures_executor; - use core::mem::Pin; + use core::mem::PinMut; use futures_core::{Future, Stream, Poll, task}; if_std! { @@ -35,12 +35,33 @@ if_nightly! { use unsafe_pin::UnsafePin; } + /// A trait for `Future`s which can be pinned to a particular location in memory. + /// + /// These futures take `self` by `PinMut`, rather than `&mut Self`. + /// This allows types which are not [`Unpin`](::std::marker::Unpin) to guarantee + /// that they won't be moved after being polled. Since they won't be moved, it's + /// possible for them to safely contain references to themselves. + /// + /// The most common examples of such self-referential `StableFuture`s are `#[async]` + /// functions and `async_block!`s. + /// + /// All types which implement `Future` also implement `StableFuture` automatically. pub trait StableFuture { + /// A successful value type Item; + + /// An error type Error; - fn poll(self: Pin, ctx: &mut task::Context) -> Poll; + /// Attempt to resolve the future to a final value, registering the current task + /// for wakeup if the value is not yet available. + /// + /// This method takes `self` by `PinMut`, and so calling it requires putting `Self` + /// in a [`PinBox`](::std::boxed::PinBox) using the `pin` method, or otherwise + /// guaranteeing that the location of `self` will not change after a call to `poll`. + fn poll(self: PinMut, ctx: &mut task::Context) -> Poll; + /// Pin the future to a particular location by placing it on the heap. #[cfg(feature = "std")] fn pin<'a>(self) -> PinBox + Send + 'a> where Self: Send + Sized + 'a @@ -48,6 +69,10 @@ if_nightly! { PinBox::new(unsafe { UnsafePin::new(self) }) } + /// Pin the future to a particular location by placing it on the heap. + /// + /// This method is the same as `pin`, but doesn't require that `Self` can be + /// safely sent across threads. `pin` should be preferred where possible. #[cfg(feature = "std")] fn pin_local<'a>(self) -> PinBox + 'a> where Self: Sized + 'a @@ -60,17 +85,37 @@ if_nightly! { type Item = F::Item; type Error = F::Error; - fn poll(mut self: Pin, ctx: &mut task::Context) -> Poll { - F::poll(unsafe { Pin::get_mut(&mut self) }, ctx) + fn poll(self: PinMut, ctx: &mut task::Context) -> Poll { + F::poll(unsafe { PinMut::get_mut_unchecked(self) }, ctx) } } + /// A trait for `Stream`s which can be pinned to a particular location in memory. + /// + /// These streams take `self` by `PinMut`, rather than `&mut Self`. + /// This allows types which are not [`Unpin`](::std::marker::Unpin) to guarantee + /// that they won't be moved after being polled. Since they won't be moved, it's + /// possible for them to safely contain references to themselves. + /// + /// The most common examples of such self-referential `StableStream`s are + /// `#[async_stream(item = Foo)]` functions. + /// + /// All types which implement `Stream` also implement `StableStream` automatically. pub trait StableStream { + /// A successful value type Item; + /// An error type Error; - fn poll_next(self: Pin, ctx: &mut task::Context) -> Poll, Self::Error>; + /// Attempt to resolve the stream to the next value, registering the current task + /// for wakeup if the value is not yet available. + /// + /// This method takes `self` by `PinMut`, and so calling it requires putting `Self` + /// in a [`PinBox`](::std::boxed::PinBox) using the `pin` method, or otherwise + /// guaranteeing that the location of `self` will not change after a call to `poll`. + fn poll_next(self: PinMut, ctx: &mut task::Context) -> Poll, Self::Error>; + /// Pin the stream to a particular location by placing it on the heap. #[cfg(feature = "std")] fn pin<'a>(self) -> PinBox + Send + 'a> where Self: Send + Sized + 'a @@ -78,6 +123,10 @@ if_nightly! { PinBox::new(unsafe { UnsafePin::new(self) }) } + /// Pin the stream to a particular location by placing it on the heap. + /// + /// This method is the same as `pin`, but doesn't require that `Self` can be + /// safely sent across threads. `pin` should be preferred where possible. #[cfg(feature = "std")] fn pin_local<'a>(self) -> PinBox + 'a> where Self: Sized + 'a @@ -90,8 +139,8 @@ if_nightly! { type Item = S::Item; type Error = S::Error; - fn poll_next(mut self: Pin, ctx: &mut task::Context) -> Poll, Self::Error> { - S::poll_next(unsafe { Pin::get_mut(&mut self) }, ctx) + fn poll_next(self: PinMut, ctx: &mut task::Context) -> Poll, Self::Error> { + S::poll_next(unsafe { PinMut::get_mut_unchecked(self) }, ctx) } } } diff --git a/futures-stable/src/unsafe_pin.rs b/futures-stable/src/unsafe_pin.rs index 2640c704f6..bfbdecd272 100644 --- a/futures-stable/src/unsafe_pin.rs +++ b/futures-stable/src/unsafe_pin.rs @@ -1,4 +1,4 @@ -use core::mem::Pin; +use core::mem::PinMut; use futures_core::{Future, Stream, Poll, task}; use {StableFuture, StableStream}; @@ -17,7 +17,7 @@ impl<'a, T: StableFuture> Future for UnsafePin { type Item = T::Item; type Error = T::Error; fn poll(&mut self, ctx: &mut task::Context) -> Poll { - T::poll(unsafe { Pin::new_unchecked(&mut self.inner) }, ctx) + T::poll(unsafe { PinMut::new_unchecked(&mut self.inner) }, ctx) } } @@ -25,6 +25,6 @@ impl<'a, T: StableStream> Stream for UnsafePin { type Item = T::Item; type Error = T::Error; fn poll_next(&mut self, ctx: &mut task::Context) -> Poll, Self::Error> { - T::poll_next(unsafe { Pin::new_unchecked(&mut self.inner) }, ctx) + T::poll_next(unsafe { PinMut::new_unchecked(&mut self.inner) }, ctx) } } diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index 31eab056f0..df969b4a42 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -1,6 +1,6 @@ [package] -name = "futures-util" -version = "0.2.0" +name = "futures-util-preview" +version = "0.2.2" authors = ["Alex Crichton "] license = "MIT/Apache-2.0" repository = "https://github.com/rust-lang-nursery/futures-rs" @@ -10,19 +10,22 @@ description = """ Common utilities and extension traits for the futures-rs library. """ +[lib] +name = "futures_util" + [features] -std = ["futures-core/std", "futures-io/std", "futures-sink/std", "either/use_std"] -default = ["std", "futures-core/either", "futures-sink/either"] +std = ["futures-core-preview/std", "futures-io-preview/std", "futures-sink-preview/std", "either/use_std"] +default = ["std", "futures-core-preview/either", "futures-sink-preview/either"] bench = [] [dependencies] -futures-core = { path = "../futures-core", version = "0.2.0", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.2.0", default-features = false } -futures-io = { path = "../futures-io", version = "0.2.0", default-features = false } -futures-sink = { path = "../futures-sink", version = "0.2.0", default-features = false} +futures-core-preview = { path = "../futures-core", version = "0.2.2", default-features = false } +futures-channel-preview = { path = "../futures-channel", version = "0.2.2", default-features = false } +futures-io-preview = { path = "../futures-io", version = "0.2.2", default-features = false } +futures-sink-preview = { path = "../futures-sink", version = "0.2.2", default-features = false} either = { version = "1.4", default-features = false } [dev-dependencies] -futures = { path = "../futures", version = "0.2.0" } -futures-executor = { path = "../futures-executor", version = "0.2.0" } -futures-channel = { path = "../futures-channel", version = "0.2.0" } +futures-preview = { path = "../futures", version = "0.2.2" } +futures-executor-preview = { path = "../futures-executor", version = "0.2.2" } +futures-channel-preview = { path = "../futures-channel", version = "0.2.2" } diff --git a/futures-util/LICENSE-APACHE b/futures-util/LICENSE-APACHE new file mode 120000 index 0000000000..965b606f33 --- /dev/null +++ b/futures-util/LICENSE-APACHE @@ -0,0 +1 @@ +../LICENSE-APACHE \ No newline at end of file diff --git a/futures-util/LICENSE-MIT b/futures-util/LICENSE-MIT new file mode 120000 index 0000000000..76219eb72e --- /dev/null +++ b/futures-util/LICENSE-MIT @@ -0,0 +1 @@ +../LICENSE-MIT \ No newline at end of file diff --git a/futures-util/benches/bilock.rs b/futures-util/benches/bilock.rs old mode 100755 new mode 100644 diff --git a/futures-util/benches/futures_unordered.rs b/futures-util/benches/futures_unordered.rs old mode 100755 new mode 100644 diff --git a/futures-util/src/future/select_all.rs b/futures-util/src/future/select_all.rs index 38c2594a32..c467982c20 100644 --- a/futures-util/src/future/select_all.rs +++ b/futures-util/src/future/select_all.rs @@ -59,7 +59,7 @@ impl Future for SelectAll }).next(); match item { Some((idx, res)) => { - self.inner.remove(idx); + self.inner.swap_remove(idx); let rest = mem::replace(&mut self.inner, Vec::new()); match res { Ok(e) => Ok(Async::Ready((e, idx, rest))), diff --git a/futures-util/src/future/shared.rs b/futures-util/src/future/shared.rs index c7b10adf00..07a5f23f67 100644 --- a/futures-util/src/future/shared.rs +++ b/futures-util/src/future/shared.rs @@ -234,7 +234,7 @@ impl Wake for Notifier { } } -unsafe impl Sync for Inner +unsafe impl Sync for Inner where F: Future + Send, F::Item: Send + Sync, F::Error: Send + Sync @@ -264,6 +264,13 @@ pub struct SharedItem { item: Arc, } +impl SharedItem { + /// Expose the inner Arc + pub fn into_inner(self) -> Arc { + self.item + } +} + impl ops::Deref for SharedItem { type Target = T; @@ -279,6 +286,13 @@ pub struct SharedError { error: Arc, } +impl SharedError { + /// Expose the inner Arc + pub fn into_inner(self) -> Arc { + self.error + } +} + impl ops::Deref for SharedError { type Target = E; diff --git a/futures-util/src/lib.rs b/futures-util/src/lib.rs index 970eab2db4..1b685b9fa1 100644 --- a/futures-util/src/lib.rs +++ b/futures-util/src/lib.rs @@ -2,7 +2,7 @@ //! and the `AsyncRead` and `AsyncWrite` traits. #![no_std] -#![deny(missing_docs, missing_debug_implementations, warnings)] +#![deny(missing_docs, missing_debug_implementations)] #![doc(html_root_url = "https://docs.rs/futures/0.1")] #[cfg(test)] diff --git a/futures-util/src/sink/err_into.rs b/futures-util/src/sink/err_into.rs index 44a68fad1f..ede795b875 100644 --- a/futures-util/src/sink/err_into.rs +++ b/futures-util/src/sink/err_into.rs @@ -6,7 +6,7 @@ use sink::{SinkExt, SinkMapErr}; /// A sink combinator to change the error type of a sink. /// /// This is created by the `Sink::err_into` method. -#[derive(Debug)] +#[derive(Clone, Debug)] #[must_use = "futures do nothing unless polled"] pub struct SinkErrInto { sink: SinkMapErr E>, diff --git a/futures-util/src/sink/fanout.rs b/futures-util/src/sink/fanout.rs index a6924facf8..683f3e0136 100644 --- a/futures-util/src/sink/fanout.rs +++ b/futures-util/src/sink/fanout.rs @@ -8,6 +8,7 @@ use futures_sink::{ Sink}; /// /// Backpressure from any downstream sink propagates up, which means that this sink /// can only process items as fast as its _slowest_ downstream sink. +#[derive(Clone)] pub struct Fanout { left: A, right: B diff --git a/futures-util/src/sink/map_err.rs b/futures-util/src/sink/map_err.rs index 379a3f0f2c..cbdff99dde 100644 --- a/futures-util/src/sink/map_err.rs +++ b/futures-util/src/sink/map_err.rs @@ -3,7 +3,7 @@ use futures_core::task; use futures_sink::{Sink}; /// Sink for the `Sink::sink_map_err` combinator. -#[derive(Debug)] +#[derive(Clone, Debug)] #[must_use = "sinks do nothing unless polled"] pub struct SinkMapErr { sink: S, diff --git a/futures-util/src/sink/with.rs b/futures-util/src/sink/with.rs index df03f95b82..2a47a81852 100644 --- a/futures-util/src/sink/with.rs +++ b/futures-util/src/sink/with.rs @@ -7,7 +7,7 @@ use futures_sink::{Sink}; /// Sink for the `Sink::with` combinator, chaining a computation to run *prior* /// to pushing a value into the underlying sink. -#[derive(Debug)] +#[derive(Clone, Debug)] #[must_use = "sinks do nothing unless polled"] pub struct With where S: Sink, @@ -20,7 +20,7 @@ pub struct With _phantom: PhantomData, } -#[derive(Debug)] +#[derive(Clone, Debug)] enum State { Empty, Process(Fut), diff --git a/futures-util/src/stream/for_each_concurrent.rs b/futures-util/src/stream/for_each_concurrent.rs new file mode 100644 index 0000000000..ef9fe164f4 --- /dev/null +++ b/futures-util/src/stream/for_each_concurrent.rs @@ -0,0 +1,77 @@ +use futures_core::{Async, Future, IntoFuture, Poll, Stream}; +use futures_core::task; + +use super::futures_unordered::FuturesUnordered; + +/// A stream combinator which executes a unit closure over each item on a +/// stream concurrently. +/// +/// This structure is returned by the `Stream::for_each` method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct ForEachConcurrent where U: IntoFuture { + stream: Option, + stream_done: bool, + f: F, + futures: FuturesUnordered, +} + +pub fn new(s: S, f: F) -> ForEachConcurrent + where S: Stream, + F: FnMut(S::Item) -> U, + U: IntoFuture, +{ + ForEachConcurrent { + stream: Some(s), + stream_done: false, + f: f, + futures: FuturesUnordered::new(), + } +} + +impl Future for ForEachConcurrent + where S: Stream, + F: FnMut(S::Item) -> U, + U: IntoFuture, +{ + type Item = S; + type Error = S::Error; + + fn poll(&mut self, cx: &mut task::Context) -> Poll { + loop { + let mut made_progress_this_iter = false; + + // Try and pull an item off of the stream + if !self.stream_done { + // `unwrap` is valid because the stream is only taken after `stream_done` is set + match self.stream.as_mut().unwrap().poll_next(cx)? { + Async::Ready(Some(x)) => { + self.futures.push((self.f)(x).into_future()); + made_progress_this_iter = true; + } + // The stream completed, so it shouldn't be polled + // anymore. + Async::Ready(None) => self.stream_done = true, + Async::Pending => {}, + } + } + + match self.futures.poll_next(cx)? { + Async::Ready(Some(())) => made_progress_this_iter = true, + Async::Ready(None) if self.stream_done => { + // We've processed all of self.futures and self.stream, + // so return self.stream + return Ok(Async::Ready(self.stream.take().expect( + "polled for_each_concurrent after completion" + ))); + } + Async::Ready(None) + | Async::Pending => {} + } + + if !made_progress_this_iter { + return Ok(Async::Pending); + } + } + } +} diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 1a6d65ae89..2d96222983 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -85,6 +85,7 @@ if_std! { mod catch_unwind; mod chunks; mod collect; + mod for_each_concurrent; mod select_all; mod split; mod futures_unordered; @@ -96,6 +97,7 @@ if_std! { pub use self::collect::Collect; pub use self::select_all::{select_all, SelectAll}; pub use self::split::{SplitStream, SplitSink, ReuniteError}; + pub use self::for_each_concurrent::ForEachConcurrent; pub use self::futures_unordered::{futures_unordered, FuturesUnordered}; pub use self::futures_ordered::{futures_ordered, FuturesOrdered}; } @@ -584,10 +586,10 @@ pub trait StreamExt: Stream { /// to successfully, producing a future. That future will then be executed /// to completion before moving on to the next item. /// - /// The returned value is a `Future` where the `Item` type is `()` and - /// errors are otherwise threaded through. Any error on the stream or in the - /// closure will cause iteration to be halted immediately and the future - /// will resolve to that error. + /// The returned value is a `Future` where the `Item` type is the completed + /// stream, and errors are otherwise threaded through. Any error on the + /// stream or in the provided future will cause iteration to be halted + /// immediately and the future will resolve to that error. /// /// To process each item in the stream and produce another stream instead /// of a single future, use `and_then` instead. @@ -599,6 +601,30 @@ pub trait StreamExt: Stream { for_each::new(self, f) } + /// Runs this stream to completion, executing the provided closure for each + /// element on the stream. This is similar to `for_each` but may begin + /// processing an element while previous elements are still being processed. + /// + /// When this stream successfully resolves to an item, the closure will be + /// called to produce a future. That future will then be added to + /// the set of futures to resolve. + /// + /// The returned value is a `Future` where the `Item` type is the completed + /// stream, and errors are otherwise threaded through. Any error on the + /// stream or in the provided future will cause iteration to be halted + /// immediately and the future will resolve to that error. + /// + /// To process each item in the stream and produce another stream instead + /// of a single future, use `and_then` instead. + #[cfg(feature = "std")] + fn for_each_concurrent(self, f: F) -> ForEachConcurrent + where F: FnMut(Self::Item) -> U, + U: IntoFuture, + Self: Sized + { + for_each_concurrent::new(self, f) + } + /// Map this stream's error to a different type using the `Into` trait. /// /// This function does for streams what `try!` does for `Result`, diff --git a/futures-util/src/stream/select_all.rs b/futures-util/src/stream/select_all.rs index 8b6925846f..0c9c581533 100644 --- a/futures-util/src/stream/select_all.rs +++ b/futures-util/src/stream/select_all.rs @@ -67,13 +67,16 @@ impl Stream for SelectAll { type Error = S::Error; fn poll_next(&mut self, cx: &mut task::Context) -> Poll, Self::Error> { - match self.inner.poll_next(cx).map_err(|(err, _)| err)? { - Async::Pending => Ok(Async::Pending), - Async::Ready(Some((Some(item), remaining))) => { - self.push(remaining); - Ok(Async::Ready(Some(item))) + loop { + match self.inner.poll_next(cx).map_err(|(err, _)| err)? { + Async::Pending => return Ok(Async::Pending), + Async::Ready(Some((Some(item), remaining))) => { + self.push(remaining); + return Ok(Async::Ready(Some(item))); + } + Async::Ready(Some((None, _))) => {} + Async::Ready(None) => return Ok(Async::Ready(None)), } - Async::Ready(_) => Ok(Async::Ready(None)), } } } diff --git a/futures-util/src/stream/take_while.rs b/futures-util/src/stream/take_while.rs index f5564c29d3..8693820556 100644 --- a/futures-util/src/stream/take_while.rs +++ b/futures-util/src/stream/take_while.rs @@ -92,6 +92,7 @@ impl Stream for TakeWhile }, Ok(Async::Ready(false)) => { self.done_taking = true; + self.pending = None; Ok(Async::Ready(None)) } Ok(Async::Pending) => Ok(Async::Pending), diff --git a/futures/Cargo.toml b/futures/Cargo.toml index 2d6060187b..0d999fa35d 100644 --- a/futures/Cargo.toml +++ b/futures/Cargo.toml @@ -1,9 +1,9 @@ [package] -name = "futures" -version = "0.2.0" +name = "futures-preview" +version = "0.2.2" authors = ["Alex Crichton "] license = "MIT/Apache-2.0" -readme = "README.md" +readme = "../README.md" keywords = ["futures", "async", "future"] repository = "https://github.com/rust-lang-nursery/futures-rs" homepage = "https://github.com/rust-lang-nursery/futures-rs" @@ -14,23 +14,29 @@ composability, and iterator-like interfaces. """ categories = ["asynchronous"] +[lib] +name = "futures" + [badges] travis-ci = { repository = "rust-lang-nursery/futures-rs" } appveyor = { repository = "rust-lang-nursery/futures-rs" } [dependencies] -futures-async-runtime = { path = "../futures-async-runtime", version = "0.2.0", default-features = false } -futures-core = { path = "../futures-core", version = "0.2.0", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.2.0", default-features = false } -futures-executor = { path = "../futures-executor", version = "0.2.0", default-features = false } -futures-io = { path = "../futures-io", version = "0.2.0", default-features = false } -futures-sink = { path = "../futures-sink", version = "0.2.0", default-features = false } -futures-stable = { path = "../futures-stable", version = "0.2.0", default-features = false } -futures-util = { path = "../futures-util", version = "0.2.0", default-features = false } -futures-macro-async = { path = "../futures-macro-async", version = "0.2.0", optional = true } -futures-macro-await = { path = "../futures-macro-await", version = "0.2.0", optional = true } +#futures-async-runtime-preview = { path = "../futures-async-runtime", version = "0.2.2", default-features = false } +#futures-core-preview = { path = "../futures-core", version = "0.2.2", default-features = false } +#futures-channel-preview = { path = "../futures-channel", version = "0.2.2", default-features = false } +#futures-executor-preview = { path = "../futures-executor", version = "0.2.2", default-features = false } +#futures-io-preview = { path = "../futures-io", version = "0.2.2", default-features = false } +#futures-sink-preview = { path = "../futures-sink", version = "0.2.2", default-features = false } +#futures-stable-preview = { path = "../futures-stable", version = "0.2.2", default-features = false } +#futures-util-preview = { path = "../futures-util", version = "0.2.2", default-features = false } +#futures-macro-async-preview = { path = "../futures-macro-async", version = "0.2.2", optional = true } +#futures-macro-await-preview = { path = "../futures-macro-await", version = "0.2.2", optional = true } [features] -nightly = ["futures-core/nightly", "futures-stable/nightly", "futures-async-runtime/nightly", "futures-macro-async", "futures-macro-await", "futures-macro-async/nightly"] -std = ["futures-core/std", "futures-executor/std", "futures-io/std", "futures-sink/std", "futures-stable/std", "futures-util/std", "futures-async-runtime/std"] -default = ["std"] +#nightly = ["futures-core-preview/nightly", "futures-stable-preview/nightly", "futures-async-runtime-preview/nightly", "futures-macro-async-preview", "futures-macro-await-preview", "futures-macro-async-preview/nightly"] +#std = ["futures-core-preview/std", "futures-executor-preview/std", "futures-io-preview/std", "futures-sink-preview/std", "futures-stable-preview/std", "futures-util-preview/std", "futures-async-runtime-preview/std"] +#default = ["std"] + +[package.metadata.docs.rs] +rustdoc-args = ["--html-in-header", ".cargo/registry/src/github.amrom.workers.dev-1ecc6299db9ec823/futures-0.2.3-docs-yank.2/yanked-redirect.html"] diff --git a/futures/LICENSE-APACHE b/futures/LICENSE-APACHE new file mode 120000 index 0000000000..965b606f33 --- /dev/null +++ b/futures/LICENSE-APACHE @@ -0,0 +1 @@ +../LICENSE-APACHE \ No newline at end of file diff --git a/futures/LICENSE-MIT b/futures/LICENSE-MIT new file mode 120000 index 0000000000..76219eb72e --- /dev/null +++ b/futures/LICENSE-MIT @@ -0,0 +1 @@ +../LICENSE-MIT \ No newline at end of file diff --git a/futures/src/lib.rs b/futures/src/lib.rs index fcfc219fa5..a4c4b12def 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -1,3 +1,4 @@ +/* //! Abstractions for asynchronous programming. //! //! This crate provides a number of core abstractions for writing asynchronous code: @@ -20,10 +21,10 @@ //! completion, but *do not block* the thread running them. #![no_std] -#![doc(html_root_url = "https://docs.rs/futures/0.2.0")] +#![doc(html_root_url = "https://docs.rs/futures/0.2.2")] -#![cfg_attr(feature = "nightly", feature(cfg_target_has_atomic))] #![cfg_attr(feature = "nightly", feature(use_extern_macros))] +#![cfg_attr(feature = "nightly", feature(cfg_target_has_atomic))] extern crate futures_async_runtime; extern crate futures_core; @@ -126,7 +127,7 @@ pub mod executor { //! # use futures::future::{Future, lazy}; //! # let my_app: Box> = Box::new(lazy(|_| Ok(()))); //! - //! // assumping `my_app: Future` + //! // assuming `my_app: Future` //! ThreadPool::new().expect("Failed to create threadpool").run(my_app); //! ``` //! @@ -138,7 +139,7 @@ pub mod executor { //! //! There are two ways to spawn a task: //! - //! - Spawn onto a "default" execuctor by calling the top-level + //! - Spawn onto a "default" executor by calling the top-level //! [`spawn`](::executor::spawn) function or [pulling the executor from the //! task context](::task::Context::executor). //! @@ -392,7 +393,10 @@ pub mod task { Context, LocalMap, Waker, UnsafeWake, }; - #[cfg_attr(feature = "nightly", cfg(target_has_atomic = "ptr"))] + #[cfg_attr( + feature = "nightly", + cfg(all(target_has_atomic = "cas", target_has_atomic = "ptr")) + )] pub use futures_core::task::AtomicWaker; #[cfg(feature = "std")] @@ -401,6 +405,92 @@ pub mod task { #[cfg(feature = "nightly")] pub mod stable { + //! `async/await` futures which can be pinned to a particular location. + //! + //! This module contains: + //! + //! - The [`StableFuture`](::StableFuture) and [`StableStream`](::StableStream) + //! traits which allow for immovable, self-referential `Future`s and `Streams`. + //! + //! - The [`StableExecutor`](::StableExecutor) trait for `Executor`s which + //! take [`PinBox`](::std::boxed:PinBox)ed `Future`s. + //! + //! - A [`block_on_stable`](::block_on_stable) function for blocking on + //! `StableFuture`s. + //! + //! These immovable future types are most commonly used with the async/await + //! macros, which are included in the prelude. These macros can be used to + //! write asynchronous code in an ergonomic blocking style: + //! + //! ```rust + //! #![feature(use_extern_macros, proc_macro_non_items, generators, pin)] + //! use futures::prelude::*; + //! use futures::prelude::await; + //! + //! /// A simple async function which returns immediately once polled: + //! #[async] + //! fn foo() -> Result { + //! Ok(1) + //! } + //! + //! /// Async functions can `await!` the result of other async functions: + //! #[async] + //! fn bar() -> Result { + //! let foo_num = await!(foo())?; + //! Ok(foo_num + 5) + //! } + //! + //! /// Async functions can also choose to return a `Box`ed `Future` type. + //! /// To opt into `Send`able futures, use `#[async(boxed, send)]`. + //! + //! #[async(boxed)] + //! fn boxed(x: i32) -> Result { + //! Ok( + //! await!(foo())? + await!(bar())? + x + //! ) + //! } + //! + //! /// Async expressions can also be written in `async_block!`s: + //! fn async_block() -> impl StableFuture { + //! println!("Runs before the future is returned"); + //! async_block! { + //! println!("Runs the first time the future is polled"); + //! Ok(5) + //! } + //! } + //! + //! /// The futures that result from async functions can be pinned and used + //! /// with other `Future` combinators: + //! #[async] + //! fn join_two_futures() -> Result<(i32, i32), i32> { + //! let joined = foo().pin().join(bar().pin()); + //! await!(joined) + //! } + //! + //! /// Streams can also be written in this style using the + //! /// `#[async_stream(item = ItemType)]` macro. The `stream_yield!` + //! /// macro is used to yield elements, and the `async_stream_block!` + //! /// macro can be used to write async streams inside other functions: + //! #[async_stream(boxed, send, item = u64)] + //! fn stream_boxed() -> Result<(), i32> { + //! let foo_result = await!(foo())?; + //! stream_yield!(foo_result as u64); + //! stream_yield!(22); + //! Ok(()) + //! } + //! + //! /// Finally #[async] can be used on `for` loops to loop over the results + //! /// of a stream: + //! #[async] + //! fn async_for() -> Result<(), i32> { + //! #[async] + //! for i in stream_boxed() { + //! println!("yielded {}", i); + //! } + //! Ok(()) + //! } + //! ``` + pub use futures_stable::{StableFuture, StableStream}; #[cfg(feature = "std")] @@ -414,3 +504,4 @@ pub mod __rt { pub extern crate std; pub use futures_async_runtime::*; } +*/ diff --git a/futures/testcrate/Cargo.toml b/futures/testcrate/Cargo.toml index 96ed23238c..15b5efd6cd 100644 --- a/futures/testcrate/Cargo.toml +++ b/futures/testcrate/Cargo.toml @@ -6,7 +6,7 @@ authors = ["Alex Crichton "] [lib] path = "lib.rs" -[dependencies.futures] +[dependencies.futures-preview] features = ["std", "nightly"] path = ".." diff --git a/futures/testcrate/ui/bad-return-type.rs b/futures/testcrate/ui/bad-return-type.rs index 5be93f5285..264a91d701 100644 --- a/futures/testcrate/ui/bad-return-type.rs +++ b/futures/testcrate/ui/bad-return-type.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro, generators, pin)] +#![feature(use_extern_macros, proc_macro_non_items, generators, pin)] extern crate futures; diff --git a/futures/testcrate/ui/bad-return-type.stderr b/futures/testcrate/ui/bad-return-type.stderr index 1e71aa7ab8..2791fb0b5a 100644 --- a/futures/testcrate/ui/bad-return-type.stderr +++ b/futures/testcrate/ui/bad-return-type.stderr @@ -23,7 +23,7 @@ error[E0308]: mismatched types found type `{integer}` = note: this error originates in a macro outside of the current crate (in Nightly builds, run with -Z external-macro-backtrace for more info) -error[E0907]: type inside generator must be known in this context +error[E0698]: type inside generator must be known in this context --> $DIR/bad-return-type.rs:19:9 | 19 | let val = Some(42); @@ -36,7 +36,7 @@ note: the type is part of the generator because of this `yield` | ^^^^^^^^^^^^^^^^^^^ = note: this error originates in a macro outside of the current crate (in Nightly builds, run with -Z external-macro-backtrace for more info) -error[E0907]: type inside generator must be known in this context +error[E0698]: type inside generator must be known in this context --> $DIR/bad-return-type.rs:24:9 | 24 | let val = val.unwrap(); @@ -49,7 +49,7 @@ note: the type is part of the generator because of this `yield` | ^^^^^^^^^^^^^^^^^^^ = note: this error originates in a macro outside of the current crate (in Nightly builds, run with -Z external-macro-backtrace for more info) -error[E0907]: type inside generator must be known in this context +error[E0698]: type inside generator must be known in this context --> $DIR/bad-return-type.rs:25:5 | 25 | stream_yield!(val); @@ -62,7 +62,7 @@ note: the type is part of the generator because of this `yield` | ^^^^^^^^^^^^^^^^^^^ = note: this error originates in a macro outside of the current crate (in Nightly builds, run with -Z external-macro-backtrace for more info) -error[E0907]: type inside generator must be known in this context +error[E0698]: type inside generator must be known in this context --> $DIR/bad-return-type.rs:25:5 | 25 | stream_yield!(val); @@ -86,5 +86,5 @@ error[E0308]: mismatched types error: aborting due to 7 previous errors -Some errors occurred: E0308, E0907. +Some errors occurred: E0308, E0698. For more information about an error, try `rustc --explain E0308`. diff --git a/futures/testcrate/ui/forget-ok.rs b/futures/testcrate/ui/forget-ok.rs index fac5f5410a..ae5a4d7baf 100644 --- a/futures/testcrate/ui/forget-ok.rs +++ b/futures/testcrate/ui/forget-ok.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro, generators, pin)] +#![feature(use_extern_macros, proc_macro_non_items, generators, pin)] extern crate futures; diff --git a/futures/testcrate/ui/missing-item.rs b/futures/testcrate/ui/missing-item.rs index 3417aa09e2..8968d4ddd6 100644 --- a/futures/testcrate/ui/missing-item.rs +++ b/futures/testcrate/ui/missing-item.rs @@ -1,5 +1,5 @@ #![allow(warnings)] -#![feature(proc_macro, generators, pin)] +#![feature(use_extern_macros, proc_macro_non_items, generators, pin)] extern crate futures; diff --git a/futures/testcrate/ui/move-captured-variable.rs b/futures/testcrate/ui/move-captured-variable.rs index 52d1d192b9..136a8d1a6d 100644 --- a/futures/testcrate/ui/move-captured-variable.rs +++ b/futures/testcrate/ui/move-captured-variable.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro, generators, pin)] +#![feature(use_extern_macros, proc_macro_non_items, generators, pin)] extern crate futures; diff --git a/futures/testcrate/ui/not-a-result.rs b/futures/testcrate/ui/not-a-result.rs index 9b962d810e..c88866ae76 100644 --- a/futures/testcrate/ui/not-a-result.rs +++ b/futures/testcrate/ui/not-a-result.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro, generators, pin)] +#![feature(use_extern_macros, proc_macro_non_items, generators, pin)] extern crate futures; diff --git a/futures/testcrate/ui/not-a-result.stderr b/futures/testcrate/ui/not-a-result.stderr index 7f95faff36..81bd089b30 100644 --- a/futures/testcrate/ui/not-a-result.stderr +++ b/futures/testcrate/ui/not-a-result.stderr @@ -7,6 +7,16 @@ error[E0277]: the trait bound `u32: futures::__rt::IsResult` is not satisfied = help: the trait `futures::__rt::IsResult` is not implemented for `u32` = note: required by `futures::__rt::gen_future` +error[E0277]: the trait bound `u32: futures::__rt::IsResult` is not satisfied + --> $DIR/not-a-result.rs:8:13 + | +8 | fn foo() -> u32 { + | ^^^ async functions must return a `Result` or a typedef of `Result` + | + = help: the trait `futures::__rt::IsResult` is not implemented for `u32` + = note: required because of the requirements on the impl of `futures::__rt::MyStableFuture` for `impl futures::__rt::MyStableFuture<<[static generator@$DIR/not-a-result.rs:7:1: 7:9 _] as std::ops::Generator>::Return>` + = note: the return type of a function must have a statically known size + error[E0277]: the trait bound `u32: futures::__rt::IsResult` is not satisfied --> $DIR/not-a-result.rs:13:17 | @@ -36,6 +46,16 @@ error[E0277]: the trait bound `u32: futures::__rt::IsResult` is not satisfied = help: the trait `futures::__rt::IsResult` is not implemented for `u32` = note: required by `futures::__rt::gen_stream` +error[E0277]: the trait bound `u32: futures::__rt::IsResult` is not satisfied + --> $DIR/not-a-result.rs:18:14 + | +18 | fn foos() -> u32 { + | ^^^ async functions must return a `Result` or a typedef of `Result` + | + = help: the trait `futures::__rt::IsResult` is not implemented for `u32` + = note: required because of the requirements on the impl of `futures::__rt::MyStableStream` for `impl futures::__rt::MyStableStream::Return>` + = note: the return type of a function must have a statically known size + error[E0277]: the trait bound `u32: futures::__rt::IsResult` is not satisfied --> $DIR/not-a-result.rs:23:18 | @@ -56,6 +76,6 @@ error[E0277]: the trait bound `u32: futures::__rt::IsResult` is not satisfied = help: the trait `futures::__rt::IsResult` is not implemented for `u32` = note: required by `futures::__rt::gen_stream` -error: aborting due to 6 previous errors +error: aborting due to 8 previous errors For more information about this error, try `rustc --explain E0277`. diff --git a/futures/testcrate/ui/type_error.rs b/futures/testcrate/ui/type_error.rs index 6d56591d38..526a3dc5f9 100644 --- a/futures/testcrate/ui/type_error.rs +++ b/futures/testcrate/ui/type_error.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro, generators, pin)] +#![feature(use_extern_macros, proc_macro_non_items, generators, pin)] extern crate futures; diff --git a/futures/testcrate/ui/unresolved-type.rs b/futures/testcrate/ui/unresolved-type.rs index d980a8a8dc..7a1add2738 100644 --- a/futures/testcrate/ui/unresolved-type.rs +++ b/futures/testcrate/ui/unresolved-type.rs @@ -1,4 +1,4 @@ -#![feature(proc_macro, generators, pin)] +#![feature(use_extern_macros, proc_macro_non_items, generators, pin)] extern crate futures; diff --git a/futures/testcrate/ui/unresolved-type.stderr b/futures/testcrate/ui/unresolved-type.stderr index da37f5757f..625dfe5824 100644 --- a/futures/testcrate/ui/unresolved-type.stderr +++ b/futures/testcrate/ui/unresolved-type.stderr @@ -5,6 +5,7 @@ error[E0412]: cannot find type `Left` in this scope | ^^^^ not found in this scope | = help: there is an enum variant `futures::future::Either::Left`, try using `futures::future::Either`? + = help: there is an enum variant `std::fmt::Alignment::Left`, try using `std::fmt::Alignment`? = help: there is an enum variant `std::fmt::rt::v1::Alignment::Left`, try using `std::fmt::rt::v1::Alignment`? error[E0412]: cannot find type `Left` in this scope @@ -14,9 +15,10 @@ error[E0412]: cannot find type `Left` in this scope | ^^^^ not found in this scope | = help: there is an enum variant `futures::future::Either::Left`, try using `futures::future::Either`? + = help: there is an enum variant `std::fmt::Alignment::Left`, try using `std::fmt::Alignment`? = help: there is an enum variant `std::fmt::rt::v1::Alignment::Left`, try using `std::fmt::rt::v1::Alignment`? -error[E0907]: type inside generator must be known in this context +error[E0698]: type inside generator must be known in this context --> $DIR/unresolved-type.rs:12:1 | 12 | #[async_stream(item = Left)] @@ -30,5 +32,5 @@ note: the type is part of the generator because of this `yield` error: aborting due to 3 previous errors -Some errors occurred: E0412, E0907. +Some errors occurred: E0412, E0698. For more information about an error, try `rustc --explain E0412`. diff --git a/futures/testcrate/ui/update-all-references.sh b/futures/testcrate/ui/update-all-references.sh old mode 100755 new mode 100644 diff --git a/futures/tests/async_await/pinned.rs b/futures/tests/async_await/pinned.rs index 0a3114fff9..d49a211d47 100644 --- a/futures/tests/async_await/pinned.rs +++ b/futures/tests/async_await/pinned.rs @@ -1,6 +1,7 @@ use futures::stable::block_on_stable; use futures::executor::{block_on, ThreadPool}; use futures::prelude::*; +use futures::prelude::await; #[async] fn foo() -> Result { diff --git a/futures/tests/async_await/smoke.rs b/futures/tests/async_await/smoke.rs index f3f1b28451..5acad8e7f2 100644 --- a/futures/tests/async_await/smoke.rs +++ b/futures/tests/async_await/smoke.rs @@ -12,6 +12,7 @@ use std::io; use futures::Never; use futures::future::poll_fn; use futures::prelude::*; +use futures::prelude::await; #[async] fn foo() -> Result { diff --git a/futures/tests/async_await_tests.rs b/futures/tests/async_await_tests.rs index 5d27629f39..844d91e8fa 100644 --- a/futures/tests/async_await_tests.rs +++ b/futures/tests/async_await_tests.rs @@ -1,4 +1,4 @@ -#![cfg_attr(feature = "nightly", feature(proc_macro, generators, pin))] +#![cfg_attr(feature = "nightly", feature(use_extern_macros, proc_macro_non_items, generators, pin))] extern crate futures; diff --git a/futures/tests/select_all.rs b/futures/tests/select_all.rs index a5e9e50dac..cc73bb8448 100644 --- a/futures/tests/select_all.rs +++ b/futures/tests/select_all.rs @@ -11,17 +11,20 @@ fn smoke() { ok(3), ]; + // This test depends on an implementation detail of `select_all` to match + // its behavior: it uses `swap_remove` under the hood. + let (i, idx, v) = block_on(select_all(v)).ok().unwrap(); assert_eq!(i, 1); assert_eq!(idx, 0); - let (i, idx, v) = block_on(select_all(v)).err().unwrap(); - assert_eq!(i, 2); - assert_eq!(idx, 0); - let (i, idx, v) = block_on(select_all(v)).ok().unwrap(); assert_eq!(i, 3); assert_eq!(idx, 0); + let (i, idx, v) = block_on(select_all(v)).err().unwrap(); + assert_eq!(i, 2); + assert_eq!(idx, 0); + assert!(v.is_empty()); } diff --git a/futures/tests/sink.rs b/futures/tests/sink.rs index 17ec5277ba..05f8e8970c 100644 --- a/futures/tests/sink.rs +++ b/futures/tests/sink.rs @@ -413,8 +413,8 @@ fn fanout_smoke() { #[test] fn fanout_backpressure() { - let (left_send, left_recv) = mpsc::channel(0); - let (right_send, right_recv) = mpsc::channel(0); + let (left_send, left_recv) = mpsc::channel(1); + let (right_send, right_recv) = mpsc::channel(1); let sink = left_send.fanout(right_send); let sink = block_on(StartSendFut::new(sink, 0)).unwrap(); diff --git a/futures/tests/stream.rs b/futures/tests/stream.rs index 25bf8b8110..56e478fd68 100644 --- a/futures/tests/stream.rs +++ b/futures/tests/stream.rs @@ -86,6 +86,28 @@ fn fold() { assert_done(|| err_list().fold(0, |a, b| ok::(a + b)), Err(3)); } +#[test] +fn for_each_concurrent() { + let (sender, receiver) = oneshot::channel::<()>(); + let (sender, receiver) = (&mut Some(sender), &mut Some(receiver)); + let fut = list().for_each_concurrent(move |num| { + match num { + // The first future is added + 1 => receiver.take().unwrap().map_err(|_| 0).left_future(), + // Second future is added and completes immediately + 2 => ok::<(), _>(()).right_future(), + // Third future is added, which when run completes the first + 3 => { + sender.take().unwrap().send(()).unwrap(); + ok::<(), _>(()).right_future() + } + _ => panic!(), + } + }).map(|_| ()); + + assert_done(|| fut, Ok(())); +} + #[test] fn filter() { assert_done(|| list().filter(|a| ok(*a % 2 == 0)).collect(), Ok(vec![2])); diff --git a/futures/tests/stream_select_all.rs b/futures/tests/stream_select_all.rs index 8081b51719..a608d818e1 100644 --- a/futures/tests/stream_select_all.rs +++ b/futures/tests/stream_select_all.rs @@ -28,11 +28,13 @@ fn works_1() { assert_eq!(Some(Ok(33)), stream.next()); assert_eq!(Some(Ok(99)), stream.next()); + mem::drop(b_tx); + c_tx.unbounded_send(42).unwrap(); assert_eq!(Some(Ok(42)), stream.next()); a_tx.unbounded_send(43).unwrap(); assert_eq!(Some(Ok(43)), stream.next()); - mem::drop((a_tx, b_tx, c_tx)); + mem::drop((a_tx, c_tx)); assert_eq!(None, stream.next()); } diff --git a/futures/yanked-redirect.html b/futures/yanked-redirect.html new file mode 100644 index 0000000000..8c07e4389d --- /dev/null +++ b/futures/yanked-redirect.html @@ -0,0 +1,4 @@ + +