diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000000..96e35d79f5 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,61 @@ +name: CI + +on: + pull_request: + push: + branches: + - master + - '[0-9]+.[0-9]+' + +env: + RUST_BACKTRACE: 1 + +defaults: + run: + shell: bash + +jobs: + test: + name: cargo +${{ matrix.rust }} test (${{ matrix.os }}) + strategy: + matrix: + include: + - rust: stable + os: ubuntu-latest + - rust: beta + os: ubuntu-latest + - rust: nightly + os: ubuntu-latest + - rust: nightly + os: macos-latest + - rust: nightly + os: windows-latest + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v2 + - name: Install Rust + # --no-self-update is necessary because the windows environment cannot self-update rustup.exe. + run: rustup update ${{ matrix.rust }} --no-self-update && rustup default ${{ matrix.rust }} + - run: cargo install cargo-hack + - run: cargo hack build --feature-powerset --workspace + - run: cargo test --workspace + - run: cargo doc --workspace --no-deps + - run: cargo bench + if: startsWith(matrix.rust, 'nightly') + - run: cargo test --features nightly + if: startsWith(matrix.rust, 'nightly') + + msrv: + name: cargo +${{ matrix.rust }} build + strategy: + matrix: + rust: + # This is the minimum Rust version supported by futures 0.1. + - 1.15.0 + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Install Rust + run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} + - run: cargo build + - run: cargo build --no-default-features diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index f3bf51c67f..0000000000 --- a/.travis.yml +++ /dev/null @@ -1,35 +0,0 @@ -language: rust - -matrix: - include: - - os: osx - - rust: stable - - rust: beta - - rust: nightly - env: BENCH=1 - before_script: - - pip install 'travis-cargo<0.2' --user && export PATH=$HOME/.local/bin:$PATH - after_success: - - travis-cargo doc-upload - - os: linux - rust: 1.15.0 - script: cargo test -sudo: false -script: - - cargo build - - cargo build --no-default-features - - cargo test - - cargo test --no-default-features --features use_std - - cargo test --manifest-path futures-cpupool/Cargo.toml - - cargo test --manifest-path futures-cpupool/Cargo.toml --no-default-features - - - cargo doc --no-deps - - cargo doc --no-deps --manifest-path futures-cpupool/Cargo.toml - - if [ "$BENCH" = "1" ]; then cargo bench; fi -env: - global: - - secure: "iwVcMVIF7ZSY82fK5UyyUvVvJxMSYrbZawh1+4Oi8pvOdYq1gptcDoOC8jxWwCwrNF1b+/85n+jlEUngEqqSmV5PjAbWPjoc+u4Zn7CRi1AlxoUlvHPiQm4vM4Mkkd6GsqoIZttCeedU9m/w0nQ18uUtK8uD6vr2FVdcMnUnkYQAxuGOowGLrwidukzfBXMCu/JrwKMIbt61knAFiI/KJknu0h1mRrhpeF/sQ3tJFzRRcQeFJkbfwDzltMpPo1hq5D3HI4ONjYi/qO2pwUhDk4umfp9cLW9MS8rQvptxJTQmWemHi+f2/U4ld6a0URL6kEuMkt/EbH0A74eFtlicfRs44dX9MlWoqbLypnC3ymqmHcpwcwNA3HmZyg800MTuU+BPK41HIPdO9tPpxjHEiqvNDknH7qs+YBnis0eH7DHJgEjXq651PjW7pm+rnHPwsj+OzKE1YBNxBQZZDkS3VnZJz+O4tVsOzc3IOz0e+lf7VVuI17C9haj117nKp3umC4MVBA0S8RfreFgqpyDeY2zwcqOr0YOlEGGRl0vyWP8Qcxx12kQ7+doLolt6Kxda4uO0hKRmIF6+qki1T+L7v8BOGOtCncz4f7IX48eQ7+Wu0OtglRn45qAa3CxjUuW6xX3KSNH66PCXV0Jtp8Ga2SSevX2wtbbFu9f+9R+PQY4=" - -notifications: - email: - on_success: never diff --git a/CHANGELOG.md b/CHANGELOG.md index 15b7d1166d..616282329e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +**Note**: This CHANGELOG is no longer maintained for newer 0.1.x releases. +See instead the github release tags and individual git commits. + +----- + # 0.1.17 - 2017-10-31 * Add a `close` method on `sink::Wait` @@ -234,7 +239,7 @@ Notable deprecations in the 0.1.4 release that will be deleted in an eventual * The `failed` constructor is now `err`. * The `done` constructor is now `result`. -As always, please report bugs to https://github.com/alexcrichton/futures-rs and +As always, please report bugs to https://github.com/rust-lang-nursery/futures-rs and we always love feedback! If you've got situations we don't cover, combinators you'd like to see, or slow code, please let us know! diff --git a/Cargo.toml b/Cargo.toml index c3e7760ecc..01dfd53f02 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,12 +1,12 @@ [package] name = "futures" -version = "0.1.18" +version = "0.1.31" authors = ["Alex Crichton "] license = "MIT/Apache-2.0" readme = "README.md" keywords = ["futures", "async", "future"] -repository = "https://github.com/alexcrichton/futures-rs" -homepage = "https://github.com/alexcrichton/futures-rs" +repository = "https://github.com/rust-lang-nursery/futures-rs" +homepage = "https://github.com/rust-lang-nursery/futures-rs" documentation = "https://docs.rs/futures" description = """ An implementation of futures and streams featuring zero allocations, @@ -14,16 +14,13 @@ composability, and iterator-like interfaces. """ categories = ["asynchronous"] -[badges] -travis-ci = { repository = "alexcrichton/futures-rs" } -appveyor = { repository = "alexcrichton/futures-rs" } - [dependencies] [features] +nightly = [] use_std = [] with-deprecated = [] default = ["use_std", "with-deprecated"] [workspace] -members = ["futures-cpupool"] +members = ["futures01", "futures-cpupool"] diff --git a/README.md b/README.md index de3b99c021..1c33d5ddbb 100644 --- a/README.md +++ b/README.md @@ -2,8 +2,7 @@ This library is an implementation of **zero-cost futures** in Rust. -[![Build Status](https://travis-ci.org/alexcrichton/futures-rs.svg?branch=master)](https://travis-ci.org/alexcrichton/futures-rs) -[![Build status](https://ci.appveyor.com/api/projects/status/yl5w3ittk4kggfsh?svg=true)](https://ci.appveyor.com/project/alexcrichton/futures-rs) +[![Build Status](https://img.shields.io/github/workflow/status/rust-lang/futures-rs/CI/master)](https://github.com/rust-lang/futures-rs/actions) [![Crates.io](https://img.shields.io/crates/v/futures.svg?maxAge=2592000)](https://crates.io/crates/futures) [Documentation](https://docs.rs/futures) @@ -16,7 +15,7 @@ First, add this to your `Cargo.toml`: ```toml [dependencies] -futures = "0.1.17" +futures = "0.1.26" ``` Next, add this to your crate: @@ -39,7 +38,7 @@ a `#[no_std]` environment, use: ```toml [dependencies] -futures = { version = "0.1.17", default-features = false } +futures = { version = "0.1.26", default-features = false } ``` # License diff --git a/appveyor.yml b/appveyor.yml deleted file mode 100644 index b516f6084c..0000000000 --- a/appveyor.yml +++ /dev/null @@ -1,39 +0,0 @@ -environment: - - # At the time this was added AppVeyor was having troubles with checking - # revocation of SSL certificates of sites like static.rust-lang.org and what - # we think is crates.io. The libcurl HTTP client by default checks for - # revocation on Windows and according to a mailing list [1] this can be - # disabled. - # - # The `CARGO_HTTP_CHECK_REVOKE` env var here tells cargo to disable SSL - # revocation checking on Windows in libcurl. Note, though, that rustup, which - # we're using to download Rust here, also uses libcurl as the default backend. - # Unlike Cargo, however, rustup doesn't have a mechanism to disable revocation - # checking. To get rustup working we set `RUSTUP_USE_HYPER` which forces it to - # use the Hyper instead of libcurl backend. Both Hyper and libcurl use - # schannel on Windows but it appears that Hyper configures it slightly - # differently such that revocation checking isn't turned on by default. - # - # [1]: https://curl.haxx.se/mail/lib-2016-03/0202.html - RUSTUP_USE_HYPER: 1 - CARGO_HTTP_CHECK_REVOKE: false - - matrix: - - TARGET: x86_64-pc-windows-msvc -install: - - set PATH=C:\Program Files\Git\mingw64\bin;%PATH% - - curl -sSf -o rustup-init.exe https://win.rustup.rs/ - - rustup-init.exe -y --default-host %TARGET% - - set PATH=%PATH%;C:\Users\appveyor\.cargo\bin - - rustc -V - - cargo -V - -build: false - -test_script: - - cargo build - - cargo build --no-default-features - - cargo test - - cargo test --no-default-features --features use_std - - cargo test --manifest-path futures-cpupool/Cargo.toml diff --git a/benches/sync_mpsc.rs b/benches/sync_mpsc.rs index 34e4dd38c2..c0365c5fed 100644 --- a/benches/sync_mpsc.rs +++ b/benches/sync_mpsc.rs @@ -1,5 +1,6 @@ #![feature(test)] +#[macro_use] extern crate futures; extern crate test; @@ -106,7 +107,6 @@ impl Stream for TestSender { Err(_) => panic!(), Ok(AsyncSink::Ready) => { self.last += 1; - assert_eq!(Ok(Async::Ready(())), self.tx.poll_complete()); Ok(Async::Ready(Some(self.last))) } Ok(AsyncSink::NotReady(_)) => { diff --git a/ci/publish.sh b/ci/publish.sh new file mode 100755 index 0000000000..4d6eae069f --- /dev/null +++ b/ci/publish.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +set -euo pipefail +IFS=$'\n\t' + +# A list of paths to the crate to be published. +# It will be published in the order listed. +MEMBERS=( + "." + "futures01" +) + +cd "$(cd "$(dirname "$0")" && pwd)"/.. + +set -x + +for i in "${!MEMBERS[@]}"; do + ( + cd "${MEMBERS[${i}]}" + cargo publish + ) + if [[ $((i + 1)) != "${#MEMBERS[@]}" ]]; then + sleep 45 + fi +done diff --git a/futures-cpupool/Cargo.toml b/futures-cpupool/Cargo.toml index fdeb541896..fee4c0799f 100644 --- a/futures-cpupool/Cargo.toml +++ b/futures-cpupool/Cargo.toml @@ -3,8 +3,8 @@ name = "futures-cpupool" version = "0.1.8" authors = ["Alex Crichton "] license = "MIT/Apache-2.0" -repository = "https://github.com/alexcrichton/futures-rs" -homepage = "https://github.com/alexcrichton/futures-rs" +repository = "https://github.com/rust-lang-nursery/futures-rs" +homepage = "https://github.com/rust-lang-nursery/futures-rs" documentation = "https://docs.rs/futures-cpupool" description = """ An implementation of thread pools which hand out futures to the results of the diff --git a/futures-cpupool/README.md b/futures-cpupool/README.md index b022994fb5..6950bc4622 100644 --- a/futures-cpupool/README.md +++ b/futures-cpupool/README.md @@ -3,8 +3,7 @@ A library for creating futures representing work happening concurrently on a dedicated thread pool. -[![Build Status](https://travis-ci.org/alexcrichton/futures-rs.svg?branch=master)](https://travis-ci.org/alexcrichton/futures-rs) -[![Build status](https://ci.appveyor.com/api/projects/status/yl5w3ittk4kggfsh?svg=true)](https://ci.appveyor.com/project/alexcrichton/futures-rs) +[![Build Status](https://img.shields.io/github/workflow/status/rust-lang/futures-rs/CI/master)](https://github.com/rust-lang/futures-rs/actions) [Documentation](https://docs.rs/futures-cpupool) diff --git a/futures-cpupool/src/lib.rs b/futures-cpupool/src/lib.rs index 0614368ba3..53e48699eb 100644 --- a/futures-cpupool/src/lib.rs +++ b/futures-cpupool/src/lib.rs @@ -36,6 +36,7 @@ #![deny(missing_docs)] #![deny(missing_debug_implementations)] +#![allow(bare_trait_objects, unknown_lints)] extern crate futures; extern crate num_cpus; @@ -50,6 +51,7 @@ use std::fmt; use futures::{IntoFuture, Future, Poll, Async}; use futures::future::{lazy, Executor, ExecuteError}; use futures::sync::oneshot::{channel, Sender, Receiver}; +#[allow(deprecated)] use futures::executor::{self, Run, Executor as OldExecutor}; /// A thread pool intended to run CPU intensive work. @@ -132,6 +134,7 @@ pub struct CpuFuture { } enum Message { + #[allow(deprecated)] Run(Run), Close, } @@ -211,6 +214,7 @@ impl CpuPool { tx: Some(tx), keep_running_flag: keep_running_flag.clone(), }; + #[allow(deprecated)] executor::spawn(sender).execute(self.inner.clone()); CpuFuture { inner: rx , keep_running_flag: keep_running_flag.clone() } } @@ -239,6 +243,7 @@ impl Executor for CpuPool where F: Future + Send + 'static, { fn execute(&self, future: F) -> Result<(), ExecuteError> { + #[allow(deprecated)] executor::spawn(future).execute(self.inner.clone()); Ok(()) } @@ -279,6 +284,7 @@ impl Drop for CpuPool { } } +#[allow(deprecated)] impl OldExecutor for Inner { fn execute(&self, run: Run) { self.send(Message::Run(run)) diff --git a/futures-cpupool/tests/smoke.rs b/futures-cpupool/tests/smoke.rs index 1b267f2f02..fbf27c7744 100644 --- a/futures-cpupool/tests/smoke.rs +++ b/futures-cpupool/tests/smoke.rs @@ -1,7 +1,11 @@ +#![allow(bare_trait_objects, unknown_lints)] + extern crate futures; extern crate futures_cpupool; -use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; +use std::sync::atomic::{AtomicUsize, Ordering}; +#[allow(deprecated)] +use std::sync::atomic::ATOMIC_USIZE_INIT; use std::thread; use std::time::Duration; @@ -36,6 +40,7 @@ fn select() { #[test] fn threads_go_away() { + #[allow(deprecated)] static CNT: AtomicUsize = ATOMIC_USIZE_INIT; struct A; @@ -66,7 +71,9 @@ fn threads_go_away() { #[test] fn lifecycle_test() { + #[allow(deprecated)] static NUM_STARTS: AtomicUsize = ATOMIC_USIZE_INIT; + #[allow(deprecated)] static NUM_STOPS: AtomicUsize = ATOMIC_USIZE_INIT; fn after_start() { diff --git a/futures01/Cargo.toml b/futures01/Cargo.toml new file mode 100644 index 0000000000..05e3e26f6e --- /dev/null +++ b/futures01/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "futures01" +version = "0.1.31" +authors = ["Alex Crichton "] +license = "MIT/Apache-2.0" +repository = "https://github.com/rust-lang-nursery/futures-rs" +homepage = "https://github.com/rust-lang-nursery/futures-rs" +documentation = "https://docs.rs/futures" +description = """ +An implementation of futures and streams featuring zero allocations, +composability, and iterator-like interfaces. + +Note that this crate tracks permanently to the 0.1 branch of +the `futures` crate. +""" + +[dependencies.futures] +path = ".." +version = "0.1.31" +default-features = false + +[features] +default = ["use_std", "with-deprecated"] +nightly = ["futures/nightly"] +use_std = ["futures/use_std"] +with-deprecated = ["futures/with-deprecated"] diff --git a/futures01/src/lib.rs b/futures01/src/lib.rs new file mode 100644 index 0000000000..e90bb2037d --- /dev/null +++ b/futures01/src/lib.rs @@ -0,0 +1,164 @@ +//! Zero-cost Futures in Rust +//! +//! This library is an implementation of futures in Rust which aims to provide +//! a robust implementation of handling asynchronous computations, ergonomic +//! composition and usage, and zero-cost abstractions over what would otherwise +//! be written by hand. +//! +//! Futures are a concept for an object which is a proxy for another value that +//! may not be ready yet. For example issuing an HTTP request may return a +//! future for the HTTP response, as it probably hasn't arrived yet. With an +//! object representing a value that will eventually be available, futures allow +//! for powerful composition of tasks through basic combinators that can perform +//! operations like chaining computations, changing the types of futures, or +//! waiting for two futures to complete at the same time. +//! +//! You can find extensive tutorials and documentations at [https://tokio.rs] +//! for both this crate (asynchronous programming in general) as well as the +//! Tokio stack to perform async I/O with. +//! +//! [https://tokio.rs]: https://tokio.rs +//! +//! ## Installation +//! +//! Add this to your `Cargo.toml`: +//! +//! ```toml +//! [dependencies] +//! futures = "0.1" +//! ``` +//! +//! ## Examples +//! +//! Let's take a look at a few examples of how futures might be used: +//! +//! ``` +//! extern crate futures; +//! +//! use std::io; +//! use std::time::Duration; +//! use futures::prelude::*; +//! use futures::future::Map; +//! +//! // A future is actually a trait implementation, so we can generically take a +//! // future of any integer and return back a future that will resolve to that +//! // value plus 10 more. +//! // +//! // Note here that like iterators, we're returning the `Map` combinator in +//! // the futures crate, not a boxed abstraction. This is a zero-cost +//! // construction of a future. +//! fn add_ten(future: F) -> Map i32> +//! where F: Future, +//! { +//! fn add(a: i32) -> i32 { a + 10 } +//! future.map(add) +//! } +//! +//! // Not only can we modify one future, but we can even compose them together! +//! // Here we have a function which takes two futures as input, and returns a +//! // future that will calculate the sum of their two values. +//! // +//! // Above we saw a direct return value of the `Map` combinator, but +//! // performance isn't always critical and sometimes it's more ergonomic to +//! // return a trait object like we do here. Note though that there's only one +//! // allocation here, not any for the intermediate futures. +//! fn add<'a, A, B>(a: A, b: B) -> Box + 'a> +//! where A: Future + 'a, +//! B: Future + 'a, +//! { +//! Box::new(a.join(b).map(|(a, b)| a + b)) +//! } +//! +//! // Futures also allow chaining computations together, starting another after +//! // the previous finishes. Here we wait for the first computation to finish, +//! // and then decide what to do depending on the result. +//! fn download_timeout(url: &str, +//! timeout_dur: Duration) +//! -> Box, Error=io::Error>> { +//! use std::io; +//! use std::net::{SocketAddr, TcpStream}; +//! +//! type IoFuture = Box>; +//! +//! // First thing to do is we need to resolve our URL to an address. This +//! // will likely perform a DNS lookup which may take some time. +//! let addr = resolve(url); +//! +//! // After we acquire the address, we next want to open up a TCP +//! // connection. +//! let tcp = addr.and_then(|addr| connect(&addr)); +//! +//! // After the TCP connection is established and ready to go, we're off to +//! // the races! +//! let data = tcp.and_then(|conn| download(conn)); +//! +//! // That all might take awhile, though, so let's not wait too long for it +//! // to all come back. The `select` combinator here returns a future which +//! // resolves to the first value that's ready plus the next future. +//! // +//! // Note we can also use the `then` combinator which is similar to +//! // `and_then` above except that it receives the result of the +//! // computation, not just the successful value. +//! // +//! // Again note that all the above calls to `and_then` and the below calls +//! // to `map` and such require no allocations. We only ever allocate once +//! // we hit the `Box::new()` call at the end here, which means we've built +//! // up a relatively involved computation with only one box, and even that +//! // was optional! +//! +//! let data = data.map(Ok); +//! let timeout = timeout(timeout_dur).map(Err); +//! +//! let ret = data.select(timeout).then(|result| { +//! match result { +//! // One future succeeded, and it was the one which was +//! // downloading data from the connection. +//! Ok((Ok(data), _other_future)) => Ok(data), +//! +//! // The timeout fired, and otherwise no error was found, so +//! // we translate this to an error. +//! Ok((Err(_timeout), _other_future)) => { +//! Err(io::Error::new(io::ErrorKind::Other, "timeout")) +//! } +//! +//! // A normal I/O error happened, so we pass that on through. +//! Err((e, _other_future)) => Err(e), +//! } +//! }); +//! return Box::new(ret); +//! +//! fn resolve(url: &str) -> IoFuture { +//! // ... +//! # panic!("unimplemented"); +//! } +//! +//! fn connect(hostname: &SocketAddr) -> IoFuture { +//! // ... +//! # panic!("unimplemented"); +//! } +//! +//! fn download(stream: TcpStream) -> IoFuture> { +//! // ... +//! # panic!("unimplemented"); +//! } +//! +//! fn timeout(stream: Duration) -> IoFuture<()> { +//! // ... +//! # panic!("unimplemented"); +//! } +//! } +//! # fn main() {} +//! ``` +//! +//! Some more information can also be found in the [README] for now, but +//! otherwise feel free to jump in to the docs below! +//! +//! [README]: https://github.com/rust-lang-nursery/futures-rs#futures-rs + +#![no_std] +#![doc(html_root_url = "https://docs.rs/futures/0.1")] + +extern crate futures; + +#[doc(inline)] +pub use futures::*; diff --git a/src/executor.rs b/src/executor.rs index b6b6d422a8..365642f770 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -8,6 +8,7 @@ //! [online]: https://tokio.rs/docs/going-deeper-futures/tasks/ #[allow(deprecated)] +#[doc(hidden)] #[cfg(feature = "use_std")] pub use task_impl::{Unpark, Executor, Run}; diff --git a/src/future/either.rs b/src/future/either.rs index f8c47f10eb..253f26784c 100644 --- a/src/future/either.rs +++ b/src/future/either.rs @@ -1,4 +1,4 @@ -use {Future, Poll}; +use {Future, Poll, Stream}; /// Combines two different futures yielding the same item and error /// types into a single type. @@ -37,3 +37,18 @@ impl Future for Either } } } + +impl Stream for Either + where A: Stream, + B: Stream +{ + type Item = A::Item; + type Error = A::Error; + + fn poll(&mut self) -> Poll, A::Error> { + match *self { + Either::A(ref mut a) => a.poll(), + Either::B(ref mut b) => b.poll(), + } + } +} diff --git a/src/future/fuse.rs b/src/future/fuse.rs index e37eace0c1..05ad3d5afa 100644 --- a/src/future/fuse.rs +++ b/src/future/fuse.rs @@ -19,6 +19,18 @@ pub fn new(f: A) -> Fuse { } } +impl Fuse { + /// Returns whether the underlying future has finished or not. + /// + /// If this method returns `true`, then all future calls to `poll` + /// are guaranteed to return `Ok(Async::NotReady)`. If this returns + /// false, then the underlying future has not been driven to + /// completion. + pub fn is_done(&self) -> bool { + self.future.is_none() + } +} + impl Future for Fuse { type Item = A::Item; type Error = A::Error; diff --git a/src/future/mod.rs b/src/future/mod.rs index 7cccd907b0..9867765902 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -102,7 +102,7 @@ if_std! { #[doc(hidden)] #[deprecated(note = "removed without replacement, recommended to use a \ local extension trait or function if needed, more \ - details in https://github.com/alexcrichton/futures-rs/issues/228")] + details in https://github.com/rust-lang-nursery/futures-rs/issues/228")] pub type BoxFuture = ::std::boxed::Box + Send>; impl Future for ::std::boxed::Box { @@ -174,6 +174,7 @@ use {Poll, stream}; /// More information about combinators can be found [on tokio.rs]. /// /// [on tokio.rs]: https://tokio.rs/docs/going-deeper-futures/futures-mechanics/ +#[must_use = "futures do nothing unless polled"] pub trait Future { /// The type of value that this future will resolved with if it is /// successful. @@ -323,7 +324,7 @@ pub trait Future { #[doc(hidden)] #[deprecated(note = "removed without replacement, recommended to use a \ local extension trait or function if needed, more \ - details in https://github.com/alexcrichton/futures-rs/issues/228")] + details in https://github.com/rust-lang-nursery/futures-rs/issues/228")] #[allow(deprecated)] fn boxed(self) -> BoxFuture where Self: Sized + Send + 'static diff --git a/src/future/shared.rs b/src/future/shared.rs index 3f7e3afd27..e3b6d2fca7 100644 --- a/src/future/shared.rs +++ b/src/future/shared.rs @@ -59,9 +59,8 @@ struct Notifier { const IDLE: usize = 0; const POLLING: usize = 1; -const REPOLL: usize = 2; -const COMPLETE: usize = 3; -const POISONED: usize = 4; +const COMPLETE: usize = 2; +const POISONED: usize = 3; pub fn new(future: F) -> Shared { Shared { @@ -133,7 +132,7 @@ impl Future for Shared IDLE => { // Lock acquired, fall through } - POLLING | REPOLL => { + POLLING => { // Another task is currently polling, at this point we just want // to ensure that our task handle is currently registered @@ -146,56 +145,45 @@ impl Future for Shared _ => unreachable!(), } - loop { - struct Reset<'a>(&'a AtomicUsize); + struct Reset<'a>(&'a AtomicUsize); - impl<'a> Drop for Reset<'a> { - fn drop(&mut self) { - use std::thread; + impl<'a> Drop for Reset<'a> { + fn drop(&mut self) { + use std::thread; - if thread::panicking() { - self.0.store(POISONED, SeqCst); - } + if thread::panicking() { + self.0.store(POISONED, SeqCst); } } + } - let _reset = Reset(&self.inner.notifier.state); - - // Poll the future - let res = unsafe { - (*self.inner.future.get()).as_mut().unwrap() - .poll_future_notify(&self.inner.notifier, 0) - }; - match res { - Ok(Async::NotReady) => { - // Not ready, try to release the handle - match self.inner.notifier.state.compare_and_swap(POLLING, IDLE, SeqCst) { - POLLING => { - // Success - return Ok(Async::NotReady); - } - REPOLL => { - // Gotta poll again! - let prev = self.inner.notifier.state.swap(POLLING, SeqCst); - assert_eq!(prev, REPOLL); - } - _ => unreachable!(), + let _reset = Reset(&self.inner.notifier.state); + + // Poll the future + let res = unsafe { + (*self.inner.future.get()).as_mut().unwrap() + .poll_future_notify(&self.inner.notifier, 0) + }; + match res { + Ok(Async::NotReady) => { + // Not ready, try to release the handle + match self.inner.notifier.state.compare_and_swap(POLLING, IDLE, SeqCst) { + POLLING => { + // Success + return Ok(Async::NotReady); } - + _ => unreachable!(), } - Ok(Async::Ready(i)) => { - unsafe { - (*self.inner.result.get()) = Some(Ok(SharedItem { item: Arc::new(i) })); - } - break; + } + Ok(Async::Ready(i)) => { + unsafe { + (*self.inner.result.get()) = Some(Ok(SharedItem { item: Arc::new(i) })); } - Err(e) => { - unsafe { - (*self.inner.result.get()) = Some(Err(SharedError { error: Arc::new(e) })); - } - - break; + } + Err(e) => { + unsafe { + (*self.inner.result.get()) = Some(Err(SharedError { error: Arc::new(e) })); } } } @@ -225,8 +213,6 @@ impl Drop for Shared where F: Future { impl Notify for Notifier { fn notify(&self, _id: usize) { - self.state.compare_and_swap(POLLING, REPOLL, SeqCst); - let waiters = mem::replace(&mut *self.waiters.lock().unwrap(), HashMap::new()); for (_, waiter) in waiters { @@ -235,8 +221,20 @@ impl Notify for Notifier { } } -unsafe impl Sync for Inner {} -unsafe impl Send for Inner {} +// The `F` is synchronized by a lock, so `F` doesn't need +// to be `Sync`. However, its `Item` or `Error` are exposed +// through an `Arc` but not lock, so they must be `Send + Sync`. +unsafe impl Send for Inner + where F: Future + Send, + F::Item: Send + Sync, + F::Error: Send + Sync, +{} + +unsafe impl Sync for Inner + where F: Future + Send, + F::Item: Send + Sync, + F::Error: Send + Sync, +{} impl fmt::Debug for Inner where F: Future + fmt::Debug, @@ -290,10 +288,12 @@ impl fmt::Display for SharedError impl error::Error for SharedError where E: error::Error, { + #[allow(deprecated)] fn description(&self) -> &str { self.error.description() } + #[allow(deprecated)] fn cause(&self) -> Option<&error::Error> { self.error.cause() } diff --git a/src/lib.rs b/src/lib.rs index ac27d3bc5f..ccadb6777f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -153,10 +153,11 @@ //! Some more information can also be found in the [README] for now, but //! otherwise feel free to jump in to the docs below! //! -//! [README]: https://github.com/alexcrichton/futures-rs#futures-rs +//! [README]: https://github.com/rust-lang-nursery/futures-rs#futures-rs #![no_std] #![deny(missing_docs, missing_debug_implementations)] +#![allow(bare_trait_objects, unknown_lints)] #![doc(html_root_url = "https://docs.rs/futures/0.1")] #[macro_use] diff --git a/src/sink/buffer.rs b/src/sink/buffer.rs index 034d571cb2..419579d9a0 100644 --- a/src/sink/buffer.rs +++ b/src/sink/buffer.rs @@ -49,9 +49,6 @@ impl Buffer { if let AsyncSink::NotReady(item) = self.sink.start_send(item)? { self.buf.push_front(item); - // ensure that we attempt to complete any pushes we've started - self.sink.poll_complete()?; - return Ok(Async::NotReady); } } diff --git a/src/sink/from_err.rs b/src/sink/from_err.rs index 92c218fe46..4880c30ef4 100644 --- a/src/sink/from_err.rs +++ b/src/sink/from_err.rs @@ -5,7 +5,7 @@ use {Sink, Poll, StartSend}; /// A sink combinator to change the error type of a sink. /// /// This is created by the `Sink::from_err` method. -#[derive(Debug)] +#[derive(Clone, Debug)] #[must_use = "futures do nothing unless polled"] pub struct SinkFromErr { sink: S, diff --git a/src/sink/map_err.rs b/src/sink/map_err.rs index cccad399e5..25c168c071 100644 --- a/src/sink/map_err.rs +++ b/src/sink/map_err.rs @@ -3,7 +3,7 @@ use sink::Sink; use {Poll, StartSend, Stream}; /// Sink for the `Sink::sink_map_err` combinator. -#[derive(Debug)] +#[derive(Clone,Debug)] #[must_use = "sinks do nothing unless polled"] pub struct SinkMapErr { sink: S, diff --git a/src/sink/with.rs b/src/sink/with.rs index b0d5c54c65..3326b6e49c 100644 --- a/src/sink/with.rs +++ b/src/sink/with.rs @@ -7,7 +7,7 @@ use stream::Stream; /// Sink for the `Sink::with` combinator, chaining a computation to run *prior* /// to pushing a value into the underlying sink. -#[derive(Debug)] +#[derive(Clone, Debug)] #[must_use = "sinks do nothing unless polled"] pub struct With where S: Sink, @@ -20,7 +20,7 @@ pub struct With _phantom: PhantomData, } -#[derive(Debug)] +#[derive(Clone, Debug)] enum State { Empty, Process(Fut), diff --git a/src/stream/chunks.rs b/src/stream/chunks.rs index 3a361eb6f2..dbfaeb89ec 100644 --- a/src/stream/chunks.rs +++ b/src/stream/chunks.rs @@ -16,7 +16,8 @@ pub struct Chunks { items: Vec, err: Option, - stream: Fuse + stream: Fuse, + cap: usize, // https://github.com/rust-lang-nursery/futures-rs/issues/1475 } pub fn new(s: S, capacity: usize) -> Chunks @@ -28,6 +29,7 @@ pub fn new(s: S, capacity: usize) -> Chunks items: Vec::with_capacity(capacity), err: None, stream: super::fuse::new(s), + cap: capacity, } } @@ -54,7 +56,7 @@ impl ::sink::Sink for Chunks impl Chunks where S: Stream { fn take(&mut self) -> Vec { - let cap = self.items.capacity(); + let cap = self.cap; mem::replace(&mut self.items, Vec::with_capacity(cap)) } @@ -93,7 +95,6 @@ impl Stream for Chunks return Err(err) } - let cap = self.items.capacity(); loop { match self.stream.poll() { Ok(Async::NotReady) => return Ok(Async::NotReady), @@ -103,7 +104,7 @@ impl Stream for Chunks // the full one. Ok(Async::Ready(Some(item))) => { self.items.push(item); - if self.items.len() >= cap { + if self.items.len() >= self.cap { return Ok(Some(self.take()).into()) } } diff --git a/src/stream/concat.rs b/src/stream/concat.rs index aae5623efa..a0da71bdd5 100644 --- a/src/stream/concat.rs +++ b/src/stream/concat.rs @@ -9,7 +9,7 @@ use stream::Stream; /// A stream combinator to concatenate the results of a stream into the first /// yielded item. /// -/// This structure is produced by the `Stream::concat` method. +/// This structure is produced by the `Stream::concat2` method. #[must_use = "streams do nothing unless polled"] pub struct Concat2 where S: Stream, @@ -169,4 +169,4 @@ enum Inner { First, Extending(E), Done, -} \ No newline at end of file +} diff --git a/src/stream/forward.rs b/src/stream/forward.rs index 2ecde10039..6722af8c20 100644 --- a/src/stream/forward.rs +++ b/src/stream/forward.rs @@ -65,7 +65,7 @@ impl Forward fn try_start_send(&mut self, item: T::Item) -> Poll<(), U::SinkError> { debug_assert!(self.buffered.is_none()); if let AsyncSink::NotReady(item) = self.sink_mut() - .take().expect("Attempted to poll Forward after completion") + .expect("Attempted to poll Forward after completion") .start_send(item)? { self.buffered = Some(item); @@ -91,17 +91,17 @@ impl Future for Forward } loop { - match self.stream_mut() - .take().expect("Attempted to poll Forward after completion") + match self.stream.as_mut() + .expect("Attempted to poll Forward after completion") .poll()? { Async::Ready(Some(item)) => try_ready!(self.try_start_send(item)), Async::Ready(None) => { - try_ready!(self.sink_mut().take().expect("Attempted to poll Forward after completion").close()); + try_ready!(self.sink_mut().expect("Attempted to poll Forward after completion").close()); return Ok(Async::Ready(self.take_result())) } Async::NotReady => { - try_ready!(self.sink_mut().take().expect("Attempted to poll Forward after completion").poll_complete()); + try_ready!(self.sink_mut().expect("Attempted to poll Forward after completion").poll_complete()); return Ok(Async::NotReady) } } diff --git a/src/stream/futures_ordered.rs b/src/stream/futures_ordered.rs index 64e2d6f9e5..561bbb5189 100644 --- a/src/stream/futures_ordered.rs +++ b/src/stream/futures_ordered.rs @@ -114,6 +114,12 @@ pub fn futures_ordered(futures: I) -> FuturesOrdered<: return queue } +impl Default for FuturesOrdered where T: Future { + fn default() -> Self { + FuturesOrdered::new() + } +} + impl FuturesOrdered where T: Future { diff --git a/src/stream/futures_unordered.rs b/src/stream/futures_unordered.rs index 2940fd3495..3f25c86f39 100644 --- a/src/stream/futures_unordered.rs +++ b/src/stream/futures_unordered.rs @@ -113,6 +113,12 @@ enum Dequeue { Inconsistent, } +impl Default for FuturesUnordered where T: Future { + fn default() -> Self { + FuturesUnordered::new() + } +} + impl FuturesUnordered where T: Future, { @@ -268,6 +274,26 @@ impl Stream for FuturesUnordered type Error = T::Error; fn poll(&mut self) -> Poll, T::Error> { + // Variable to determine how many times it is allowed to poll underlying + // futures without yielding. + // + // A single call to `poll_next` may potentially do a lot of work before + // yielding. This happens in particular if the underlying futures are awoken + // frequently but continue to return `Pending`. This is problematic if other + // tasks are waiting on the executor, since they do not get to run. This value + // caps the number of calls to `poll` on underlying futures a single call to + // `poll_next` is allowed to make. + // + // The value is the length of FuturesUnordered. This ensures that each + // future is polled only once at most per iteration. + // + // See also https://github.com/rust-lang/futures-rs/issues/2047. + let yield_every = self.len(); + + // Keep track of how many child futures we have polled, + // in case we want to forcibly yield. + let mut polled = 0; + // Ensure `parent` is correctly set. self.inner.parent.register(); @@ -363,12 +389,21 @@ impl Stream for FuturesUnordered future.poll() }) }; + polled += 1; let ret = match res { Ok(Async::NotReady) => { let node = bomb.node.take().unwrap(); *node.future.get() = Some(future); bomb.queue.link(node); + + if polled == yield_every { + // We have polled a large number of futures in a row without yielding. + // To ensure we do not starve other tasks waiting on the executor, + // we yield here, but immediately wake ourselves up to continue. + task_impl::current().notify(); + return Ok(Async::NotReady); + } continue } Ok(Async::Ready(e)) => Ok(Async::Ready(Some(e))), diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 566d143b94..9ad4eea45c 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -128,7 +128,7 @@ if_std! { #[doc(hidden)] #[deprecated(note = "removed without replacement, recommended to use a \ local extension trait or function if needed, more \ - details in https://github.com/alexcrichton/futures-rs/issues/228")] + details in https://github.com/rust-lang-nursery/futures-rs/issues/228")] pub type BoxStream = ::std::boxed::Box + Send>; impl Stream for ::std::boxed::Box { @@ -179,7 +179,8 @@ if_std! { /// entirely. If one of these use cases suits you perfectly and not the other, /// please feel welcome to comment on [the issue][being considered]! /// -/// [being considered]: https://github.com/alexcrichton/futures-rs/issues/206 +/// [being considered]: https://github.com/rust-lang-nursery/futures-rs/issues/206 +#[must_use = "streams do nothing unless polled"] pub trait Stream { /// The type of item this stream will yield on success. type Item; @@ -271,7 +272,7 @@ pub trait Stream { #[doc(hidden)] #[deprecated(note = "removed without replacement, recommended to use a \ local extension trait or function if needed, more \ - details in https://github.com/alexcrichton/futures-rs/issues/228")] + details in https://github.com/rust-lang-nursery/futures-rs/issues/228")] #[allow(deprecated)] fn boxed(self) -> BoxStream where Self: Sized + Send + 'static, @@ -398,7 +399,7 @@ pub trait Stream { /// /// let (_tx, rx) = mpsc::channel::(1); /// let evens_plus_one = rx.filter_map(|x| { - /// if x % 0 == 2 { + /// if x % 2 == 0 { /// Some(x + 1) /// } else { /// None diff --git a/src/sync/mpsc/mod.rs b/src/sync/mpsc/mod.rs index 7187579666..31d2320ab6 100644 --- a/src/sync/mpsc/mod.rs +++ b/src/sync/mpsc/mod.rs @@ -600,6 +600,11 @@ impl Sender { Ok(self.poll_unparked(true)) } + /// Returns whether this channel is closed without needing a context. + pub fn is_closed(&self) -> bool { + !decode_state(self.inner.state.load(SeqCst)).is_open + } + fn poll_unparked(&mut self, do_park: bool) -> Async<()> { // First check the `maybe_parked` variable. This avoids acquiring the // lock in most cases @@ -649,7 +654,14 @@ impl Sink for Sender { } fn poll_complete(&mut self) -> Poll<(), SendError> { - Ok(Async::Ready(())) + self.poll_ready() + // At this point, the value cannot be returned and `SendError` + // cannot be created with a `T` without breaking backwards + // comptibility. This means we cannot return an error. + // + // That said, there is also no guarantee that a `poll_complete` + // returning `Ok` implies the receiver sees the message. + .or_else(|_| Ok(().into())) } fn close(&mut self) -> Poll<(), SendError> { @@ -658,6 +670,11 @@ impl Sink for Sender { } impl UnboundedSender { + /// Returns whether this channel is closed without needing a context. + pub fn is_closed(&self) -> bool { + self.0.is_closed() + } + /// Sends the provided message along this channel. /// /// This is an unbounded sender, so this function differs from `Sink::send` @@ -813,6 +830,12 @@ impl Receiver { loop { match unsafe { self.inner.message_queue.pop() } { PopResult::Data(msg) => { + // If there are any parked task handles in the parked queue, + // pop one and unpark it. + self.unpark_one(); + // Decrement number of messages + self.dec_num_messages(); + return Async::Ready(msg); } PopResult::Empty => { @@ -863,7 +886,7 @@ impl Receiver { let state = decode_state(curr); // If the channel is closed, then there is no need to park. - if !state.is_open && state.num_messages == 0 { + if state.is_closed() { return TryPark::Closed; } @@ -904,8 +927,8 @@ impl Stream for Receiver { fn poll(&mut self) -> Poll, ()> { loop { // Try to read a message off of the message queue. - let msg = match self.next_message() { - Async::Ready(msg) => msg, + match self.next_message() { + Async::Ready(msg) => return Ok(Async::Ready(msg)), Async::NotReady => { // There are no messages to read, in this case, attempt to // park. The act of parking will verify that the channel is @@ -929,17 +952,7 @@ impl Stream for Receiver { } } } - }; - - // If there are any parked task handles in the parked queue, pop - // one and unpark it. - self.unpark_one(); - - // Decrement number of messages - self.dec_num_messages(); - - // Return the message - return Ok(Async::Ready(msg)); + } } } } @@ -948,8 +961,27 @@ impl Drop for Receiver { fn drop(&mut self) { // Drain the channel of all pending messages self.close(); - while self.next_message().is_ready() { - // ... + + loop { + match self.next_message() { + Async::Ready(_) => {} + Async::NotReady => { + let curr = self.inner.state.load(SeqCst); + let state = decode_state(curr); + + // If the channel is closed, then there is no need to park. + if state.is_closed() { + return; + } + + // TODO: Spinning isn't ideal, it might be worth + // investigating using a condvar or some other strategy + // here. That said, if this case is hit, then another thread + // is about to push the value into the queue and this isn't + // the only spinlock in the impl right now. + thread::yield_now(); + } + } } } } @@ -1125,6 +1157,12 @@ impl Inner { unsafe impl Send for Inner {} unsafe impl Sync for Inner {} +impl State { + fn is_closed(&self) -> bool { + !self.is_open && self.num_messages == 0 + } +} + /* * * ===== Helpers ===== diff --git a/src/sync/oneshot.rs b/src/sync/oneshot.rs index d95883d3ba..3a9d8efdca 100644 --- a/src/sync/oneshot.rs +++ b/src/sync/oneshot.rs @@ -206,7 +206,7 @@ impl Inner { // under the hood. If it instead used `Release` / `Acquire` ordering, // then it would not necessarily synchronize with `inner.complete` // and deadlock might be possible, as was observed in - // https://github.com/alexcrichton/futures-rs/pull/219. + // https://github.com/rust-lang-nursery/futures-rs/pull/219. self.complete.store(true, SeqCst); if let Some(mut slot) = self.rx_task.try_lock() { if let Some(task) = slot.take() { @@ -228,6 +228,23 @@ impl Inner { } } + fn try_recv(&self) -> Result, Canceled> { + // If we're complete, either `::close_rx` or `::drop_tx` was called. + // We can assume a successful send if data is present. + if self.complete.load(SeqCst) { + if let Some(mut slot) = self.data.try_lock() { + if let Some(data) = slot.take() { + return Ok(Some(data.into())); + } + } + // Should there be a different error value or a panic in the case + // where `self.data.try_lock() == None`? + Err(Canceled) + } else { + Ok(None) + } + } + fn recv(&self) -> Poll { let mut done = false; @@ -346,6 +363,9 @@ impl Sender { /// within the context of a task. In other words, this should only ever be /// called from inside another future. /// + /// If `Ok(Ready)` is returned then the associated `Receiver` has been + /// dropped, which means any work required for sending should be canceled. + /// /// If you're calling this function from a context that does not have a /// task, then you can use the `is_canceled` API instead. pub fn poll_cancel(&mut self) -> Poll<(), ()> { @@ -403,6 +423,21 @@ impl Receiver { pub fn close(&mut self) { self.inner.close_rx() } + + /// Attempts to receive a message outside of the context of a task. + /// + /// Useful when a [`Context`](Context) is not available such as within a + /// `Drop` impl. + /// + /// Does not schedule a task wakeup or have any other side effects. + /// + /// A return value of `None` must be considered immediately stale (out of + /// date) unless [`::close`](Receiver::close) has been called first. + /// + /// Returns an error if the sender was dropped. + pub fn try_recv(&mut self) -> Result, Canceled> { + self.inner.try_recv() + } } impl Future for Receiver { diff --git a/src/task.rs b/src/task.rs index 941f227dd3..f83f2c4719 100644 --- a/src/task.rs +++ b/src/task.rs @@ -33,7 +33,7 @@ #[allow(deprecated)] pub use task_impl::{Spawn, spawn, Unpark, Executor, Run, park}; -pub use task_impl::{Task, AtomicTask, current, init}; +pub use task_impl::{Task, AtomicTask, current, init, is_in_task}; #[allow(deprecated)] #[cfg(feature = "use_std")] diff --git a/src/task_impl/atomic_task.rs b/src/task_impl/atomic_task.rs index a89e20c70f..d73954e617 100644 --- a/src/task_impl/atomic_task.rs +++ b/src/task_impl/atomic_task.rs @@ -1,11 +1,9 @@ -#![allow(dead_code)] - use super::Task; use core::fmt; use core::cell::UnsafeCell; use core::sync::atomic::AtomicUsize; -use core::sync::atomic::Ordering::{Acquire, Release}; +use core::sync::atomic::Ordering::{Acquire, Release, AcqRel}; /// A synchronization primitive for task notification. /// @@ -31,32 +29,110 @@ pub struct AtomicTask { task: UnsafeCell>, } -/// Initial state, the `AtomicTask` is currently not being used. -/// -/// The value `2` is picked specifically because it between the write lock & -/// read lock values. Since the read lock is represented by an incrementing -/// counter, this enables an atomic fetch_sub operation to be used for releasing -/// a lock. -const WAITING: usize = 2; - -/// The `register` function has determined that the task is no longer current. -/// This implies that `AtomicTask::register` is being called from a different -/// task than is represented by the currently stored task. The write lock is -/// obtained to update the task cell. -const LOCKED_WRITE: usize = 0; - -/// At least one call to `notify` happened concurrently to `register` updating -/// the task cell. This state is detected when `register` exits the mutation -/// code and signals to `register` that it is responsible for notifying its own -/// task. -const LOCKED_WRITE_NOTIFIED: usize = 1; - - -/// The `notify` function has locked access to the task cell for notification. -/// -/// The constant is left here mostly for documentation reasons. -#[allow(dead_code)] -const LOCKED_READ: usize = 3; +// `AtomicTask` is a multi-consumer, single-producer transfer cell. The cell +// stores a `Task` value produced by calls to `register` and many threads can +// race to take the task (to notify it) by calling `notify. +// +// If a new `Task` instance is produced by calling `register` before an existing +// one is consumed, then the existing one is overwritten. +// +// While `AtomicTask` is single-producer, the implementation ensures memory +// safety. In the event of concurrent calls to `register`, there will be a +// single winner whose task will get stored in the cell. The losers will not +// have their tasks notified. As such, callers should ensure to add +// synchronization to calls to `register`. +// +// The implementation uses a single `AtomicUsize` value to coordinate access to +// the `Task` cell. There are two bits that are operated on independently. These +// are represented by `REGISTERING` and `NOTIFYING`. +// +// The `REGISTERING` bit is set when a producer enters the critical section. The +// `NOTIFYING` bit is set when a consumer enters the critical section. Neither +// bit being set is represented by `WAITING`. +// +// A thread obtains an exclusive lock on the task cell by transitioning the +// state from `WAITING` to `REGISTERING` or `NOTIFYING`, depending on the +// operation the thread wishes to perform. When this transition is made, it is +// guaranteed that no other thread will access the task cell. +// +// # Registering +// +// On a call to `register`, an attempt to transition the state from WAITING to +// REGISTERING is made. On success, the caller obtains a lock on the task cell. +// +// If the lock is obtained, then the thread sets the task cell to the task +// provided as an argument. Then it attempts to transition the state back from +// `REGISTERING` -> `WAITING`. +// +// If this transition is successful, then the registering process is complete +// and the next call to `notify` will observe the task. +// +// If the transition fails, then there was a concurrent call to `notify` that +// was unable to access the task cell (due to the registering thread holding the +// lock). To handle this, the registering thread removes the task it just set +// from the cell and calls `notify` on it. This call to notify represents the +// attempt to notify by the other thread (that set the `NOTIFYING` bit). The +// state is then transitioned from `REGISTERING | NOTIFYING` back to `WAITING`. +// This transition must succeed because, at this point, the state cannot be +// transitioned by another thread. +// +// # Notifying +// +// On a call to `notify`, an attempt to transition the state from `WAITING` to +// `NOTIFYING` is made. On success, the caller obtains a lock on the task cell. +// +// If the lock is obtained, then the thread takes ownership of the current value +// in teh task cell, and calls `notify` on it. The state is then transitioned +// back to `WAITING`. This transition must succeed as, at this point, the state +// cannot be transitioned by another thread. +// +// If the thread is unable to obtain the lock, the `NOTIFYING` bit is still. +// This is because it has either been set by the current thread but the previous +// value included the `REGISTERING` bit **or** a concurrent thread is in the +// `NOTIFYING` critical section. Either way, no action must be taken. +// +// If the current thread is the only concurrent call to `notify` and another +// thread is in the `register` critical section, when the other thread **exits** +// the `register` critical section, it will observe the `NOTIFYING` bit and +// handle the notify itself. +// +// If another thread is in the `notify` critical section, then it will handle +// notifying the task. +// +// # A potential race (is safely handled). +// +// Imagine the following situation: +// +// * Thread A obtains the `notify` lock and notifies a task. +// +// * Before thread A releases the `notify` lock, the notified task is scheduled. +// +// * Thread B attempts to notify the task. In theory this should result in the +// task being notified, but it cannot because thread A still holds the notify +// lock. +// +// This case is handled by requiring users of `AtomicTask` to call `register` +// **before** attempting to observe the application state change that resulted +// in the task being notified. The notifiers also change the application state +// before calling notify. +// +// Because of this, the task will do one of two things. +// +// 1) Observe the application state change that Thread B is notifying on. In +// this case, it is OK for Thread B's notification to be lost. +// +// 2) Call register before attempting to observe the application state. Since +// Thread A still holds the `notify` lock, the call to `register` will result +// in the task notifying itself and get scheduled again. + +/// Idle state +const WAITING: usize = 0; + +/// A new task value is being registered with the `AtomicTask` cell. +const REGISTERING: usize = 0b01; + +/// The task currently registered with the `AtomicTask` cell is being notified. +const NOTIFYING: usize = 0b10; impl AtomicTask { /// Create an `AtomicTask` initialized with the given `Task` @@ -73,6 +149,13 @@ impl AtomicTask { /// Registers the current task to be notified on calls to `notify`. /// + /// This is the same as calling `register_task` with `task::current()`. + pub fn register(&self) { + self.register_task(super::current()); + } + + /// Registers the provided task to be notified on calls to `notify`. + /// /// The new task will take place of any previous tasks that were registered /// by previous calls to `register`. Any calls to `notify` that happen after /// a call to `register` (as defined by the memory ordering rules), will @@ -86,39 +169,64 @@ impl AtomicTask { /// idea. Concurrent calls to `register` will attempt to register different /// tasks to be notified. One of the callers will win and have its task set, /// but there is no guarantee as to which caller will succeed. - pub fn register(&self) { - // Get a new task handle - let task = super::current(); - - match self.state.compare_and_swap(WAITING, LOCKED_WRITE, Acquire) { + pub fn register_task(&self, task: Task) { + match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) { WAITING => { unsafe { - // Locked acquired, update the task cell - *self.task.get() = Some(task); - - // Release the lock. If the state transitioned to - // `LOCKED_NOTIFIED`, this means that an notify has been - // signaled, so notify the task. - if LOCKED_WRITE_NOTIFIED == self.state.swap(WAITING, Release) { - (*self.task.get()).as_ref().unwrap().notify(); + // Locked acquired, update the waker cell + *self.task.get() = Some(task.clone()); + + // Release the lock. If the state transitioned to include + // the `NOTIFYING` bit, this means that a notify has been + // called concurrently, so we have to remove the task and + // notify it.` + // + // Start by assuming that the state is `REGISTERING` as this + // is what we jut set it to. + let res = self.state.compare_exchange( + REGISTERING, WAITING, AcqRel, Acquire); + + match res { + Ok(_) => {} + Err(actual) => { + // This branch can only be reached if a + // concurrent thread called `notify`. In this + // case, `actual` **must** be `REGISTERING | + // `NOTIFYING`. + debug_assert_eq!(actual, REGISTERING | NOTIFYING); + + // Take the task to notify once the atomic operation has + // completed. + let notify = (*self.task.get()).take().unwrap(); + + // Just swap, because no one could change state + // while state == `Registering | `Waking` + self.state.swap(WAITING, AcqRel); + + // The atomic swap was complete, now + // notify the task and return. + notify.notify(); + } } } } - LOCKED_WRITE | LOCKED_WRITE_NOTIFIED => { - // A thread is concurrently calling `register`. This shouldn't - // happen as it doesn't really make much sense, but it isn't - // unsafe per se. Since two threads are concurrently trying to - // update the task, it's undefined which one "wins" (no ordering - // guarantees), so we can just do nothing. + NOTIFYING => { + // Currently in the process of notifying the task, i.e., + // `notify` is currently being called on the old task handle. + // So, we call notify on the new task handle + task.notify(); } state => { - debug_assert!(state != LOCKED_WRITE, "unexpected state LOCKED_WRITE"); - debug_assert!(state != LOCKED_WRITE_NOTIFIED, "unexpected state LOCKED_WRITE_NOTIFIED"); - - // Currently in a read locked state, this implies that `notify` - // is currently being called on the old task handle. So, we call - // notify on the new task handle - task.notify(); + // In this case, a concurrent thread is holding the + // "registering" lock. This probably indicates a bug in the + // caller's code as racing to call `register` doesn't make much + // sense. + // + // We just want to maintain memory safety. It is ok to drop the + // call to `register`. + debug_assert!( + state == REGISTERING || + state == REGISTERING | NOTIFYING); } } } @@ -127,49 +235,33 @@ impl AtomicTask { /// /// If `register` has not been called yet, then this does nothing. pub fn notify(&self) { - let mut curr = WAITING; - - loop { - if curr == LOCKED_WRITE { - // Transition the state to LOCKED_NOTIFIED - let actual = self.state.compare_and_swap(LOCKED_WRITE, LOCKED_WRITE_NOTIFIED, Release); - - if curr == actual { - // Success, return - return; - } - - // update current state variable and try again - curr = actual; - - } else if curr == LOCKED_WRITE_NOTIFIED { - // Currently in `LOCKED_WRITE_NOTIFIED` state, nothing else to do. - return; - - } else { - // Currently in a LOCKED_READ state, so attempt to increment the - // lock count. - let actual = self.state.compare_and_swap(curr, curr + 1, Acquire); - - // Locked acquired - if actual == curr { - // Notify the task - unsafe { - if let Some(ref task) = *self.task.get() { - task.notify(); - } - } + // AcqRel ordering is used in order to acquire the value of the `task` + // cell as well as to establish a `release` ordering with whatever + // memory the `AtomicTask` is associated with. + match self.state.fetch_or(NOTIFYING, AcqRel) { + WAITING => { + // The notifying lock has been acquired. + let task = unsafe { (*self.task.get()).take() }; - // Release the lock - self.state.fetch_sub(1, Release); + // Release the lock + self.state.fetch_and(!NOTIFYING, Release); - // Done - return; + if let Some(task) = task { + task.notify(); } - - // update current state variable and try again - curr = actual; - + } + state => { + // There is a concurrent thread currently updating the + // associated task. + // + // Nothing more to do as the `NOTIFYING` bit has been set. It + // doesn't matter if there are concurrent registering threads or + // not. + // + debug_assert!( + state == REGISTERING || + state == REGISTERING | NOTIFYING || + state == NOTIFYING); } } } diff --git a/src/task_impl/core.rs b/src/task_impl/core.rs index 9a77a2aef9..d454116012 100644 --- a/src/task_impl/core.rs +++ b/src/task_impl/core.rs @@ -2,7 +2,9 @@ use core::marker; use core::mem; -use core::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT}; +use core::sync::atomic::AtomicUsize; +#[allow(deprecated)] +use core::sync::atomic::ATOMIC_USIZE_INIT; use core::sync::atomic::Ordering::{SeqCst, Relaxed}; use super::{BorrowedTask, NotifyHandle}; @@ -84,7 +86,9 @@ impl Drop for TaskUnpark { } } +#[allow(deprecated)] static GET: AtomicUsize = ATOMIC_USIZE_INIT; +#[allow(deprecated)] static SET: AtomicUsize = ATOMIC_USIZE_INIT; /// Initialize the `futures` task system. @@ -136,6 +140,15 @@ pub unsafe fn init(get: fn() -> *mut u8, set: fn(*mut u8)) -> bool { } } +/// Return whether the caller is running in a task (and so can use task_local!). +pub fn is_in_task() -> bool { + if let Some(ptr) = get_ptr() { + !ptr.is_null() + } else { + false + } +} + #[inline] pub fn get_ptr() -> Option<*mut u8> { match GET.load(Relaxed) { diff --git a/src/task_impl/mod.rs b/src/task_impl/mod.rs index 132173459f..6f1cf36c0c 100644 --- a/src/task_impl/mod.rs +++ b/src/task_impl/mod.rs @@ -24,13 +24,16 @@ pub struct BorrowedTask<'a> { } fn fresh_task_id() -> usize { - use core::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; + use core::sync::atomic::{AtomicUsize, Ordering}; + #[allow(deprecated)] + use core::sync::atomic::ATOMIC_USIZE_INIT; // TODO: this assert is a real bummer, need to figure out how to reuse // old IDs that are no longer in use. // // Note, though, that it is intended that these ids go away entirely // eventually, see the comment on `is_current` below. + #[allow(deprecated)] static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT; let id = NEXT_ID.fetch_add(1, Ordering::Relaxed); assert!(id < usize::max_value() / 2, @@ -54,6 +57,31 @@ fn with R, R>(f: F) -> R { /// the future as notifications arrive, until the future terminates. /// /// This is obtained by the `task::current` function. +/// +/// # FAQ +/// +/// ### Why does `Task` not implement `Eq` and `Hash`? +/// +/// A valid use case for `Task` to implement these two traits has not been +/// encountered. +/// +/// Usually, this question is asked by someone who wants to store a `Task` +/// instance in a `HashSet`. This seems like an obvious way to implement a +/// future aware, multi-handle structure; e.g. a multi-producer channel. +/// +/// In this case, the idea is that whenever a `start_send` is called on one of +/// the channel's send handles, if the channel is at capacity, the current task +/// is stored in a set. Then, when capacity is available, a task is removed from +/// the set and notified. +/// +/// The problem with this strategy is that multiple `Sender` handles can be used +/// on the same task. In this case, when the second handle is used and the task +/// is stored in a set, there already is an entry. Then, when the first +/// handle is dropped, this entry is cleared, resulting in a dead lock. +/// +/// See [here](https://github.com/rust-lang-nursery/futures-rs/issues/670) for +/// more discussion. +/// #[derive(Clone)] pub struct Task { id: usize, @@ -109,7 +137,7 @@ impl Task { /// Indicate that the task should attempt to poll its future in a timely /// fashion. /// - /// It's typically guaranteed that, for each call to `notify`, `poll` will + /// It's typically guaranteed that, after calling `notify`, `poll` will /// be called at least once subsequently (unless the future has terminated). /// If the task is currently polling its future when `notify` is called, it /// must poll the future *again* afterwards, ensuring that all relevant @@ -250,6 +278,19 @@ impl Spawn { self.obj } + /// Calls the provided closure, scheduling notifications to be sent to the + /// `notify` argument. + pub fn poll_fn_notify(&mut self, + notify: &N, + id: usize, + f: F) -> R + where F: FnOnce(&mut T) -> R, + N: Clone + Into, + { + let mk = || notify.clone().into(); + self.enter(BorrowedUnpark::new(&mk, id), f) + } + /// Polls the internal future, scheduling notifications to be sent to the /// `notify` argument. /// @@ -285,8 +326,7 @@ impl Spawn { where N: Clone + Into, T: Future, { - let mk = || notify.clone().into(); - self.enter(BorrowedUnpark::new(&mk, id), |f| f.poll()) + self.poll_fn_notify(notify, id, |f| f.poll()) } /// Like `poll_future_notify`, except polls the underlying stream. @@ -297,8 +337,7 @@ impl Spawn { where N: Clone + Into, T: Stream, { - let mk = || notify.clone().into(); - self.enter(BorrowedUnpark::new(&mk, id), |s| s.poll()) + self.poll_fn_notify(notify, id, |s| s.poll()) } /// Invokes the underlying `start_send` method with this task in place. @@ -314,8 +353,7 @@ impl Spawn { where N: Clone + Into, T: Sink, { - let mk = || notify.clone().into(); - self.enter(BorrowedUnpark::new(&mk, id), |s| s.start_send(value)) + self.poll_fn_notify(notify, id, |s| s.start_send(value)) } /// Invokes the underlying `poll_complete` method with this task in place. @@ -330,8 +368,7 @@ impl Spawn { where N: Clone + Into, T: Sink, { - let mk = || notify.clone().into(); - self.enter(BorrowedUnpark::new(&mk, id), |s| s.poll_complete()) + self.poll_fn_notify(notify, id, |s| s.poll_complete()) } /// Invokes the underlying `close` method with this task in place. @@ -346,8 +383,7 @@ impl Spawn { where N: Clone + Into, T: Sink, { - let mk = || notify.clone().into(); - self.enter(BorrowedUnpark::new(&mk, id), |s| s.close()) + self.poll_fn_notify(notify, id, |s| s.close()) } fn enter(&mut self, unpark: BorrowedUnpark, f: F) -> R @@ -687,3 +723,11 @@ impl From<&'static T> for NotifyHandle { unsafe { NotifyHandle::new(src as *const _ as *mut StaticRef) } } } + +#[cfg(feature = "nightly")] +mod nightly { + use super::NotifyHandle; + use core::marker::Unpin; + + impl Unpin for NotifyHandle {} +} diff --git a/src/task_impl/std/mod.rs b/src/task_impl/std/mod.rs index 2472c8124e..e82a23e5d0 100644 --- a/src/task_impl/std/mod.rs +++ b/src/task_impl/std/mod.rs @@ -5,7 +5,9 @@ use std::fmt; use std::marker::PhantomData; use std::mem; use std::ptr; -use std::sync::{Arc, Mutex, Condvar, Once, ONCE_INIT}; +use std::sync::{Arc, Mutex, Condvar, Once}; +#[allow(deprecated)] +use std::sync::ONCE_INIT; use std::sync::atomic::{AtomicUsize, Ordering}; use {Future, Stream, Sink, Poll, Async, StartSend, AsyncSink}; @@ -27,6 +29,12 @@ pub use task_impl::core::init; thread_local!(static CURRENT_TASK: Cell<*mut u8> = Cell::new(ptr::null_mut())); +/// Return whether the caller is running in a task (and so can use task_local!). +pub fn is_in_task() -> bool { + CURRENT_TASK.with(|task| !task.get().is_null()) +} + +#[allow(deprecated)] static INIT: Once = ONCE_INIT; pub fn get_ptr() -> Option<*mut u8> { @@ -211,19 +219,7 @@ impl TaskUnpark { } impl Spawn { - /// Polls the internal future, scheduling notifications to be sent to the - /// `unpark` argument. - /// - /// This method will poll the internal future, testing if it's completed - /// yet. The `unpark` argument is used as a sink for notifications sent to - /// this future. That is, while the future is being polled, any call to - /// `task::park()` will return a handle that contains the `unpark` - /// specified. - /// - /// If this function returns `NotReady`, then the `unpark` should have been - /// scheduled to receive a notification when poll can be called again. - /// Otherwise if `Ready` or `Err` is returned, the `Spawn` task can be - /// safely destroyed. + #[doc(hidden)] #[deprecated(note = "recommended to use `poll_future_notify` instead")] #[allow(deprecated)] pub fn poll_future(&mut self, unpark: Arc) -> Poll { @@ -248,25 +244,10 @@ impl Spawn { }) } - /// A specialized function to request running a future to completion on the - /// specified executor. - /// - /// This function only works for futures whose item and error types are `()` - /// and also implement the `Send` and `'static` bounds. This will submit - /// units of work (instances of `Run`) to the `exec` argument provided - /// necessary to drive the future to completion. - /// - /// When the future would block, it's arranged that when the future is again - /// ready it will submit another unit of work to the `exec` provided. This - /// will happen in a loop until the future has completed. - /// - /// This method is not appropriate for all futures, and other kinds of - /// executors typically provide a similar function with perhaps relaxed - /// bounds as well. - /// - /// Note that this method is likely to be deprecated in favor of the - /// `futures::Executor` trait and `execute` method, but if this'd cause - /// difficulty for you please let us know! + + #[doc(hidden)] + #[deprecated] + #[allow(deprecated)] pub fn execute(self, exec: Arc) where F: Future + Send + 'static, { @@ -285,9 +266,9 @@ impl Spawn { } impl Spawn { - /// Like `poll_future`, except polls the underlying stream. #[deprecated(note = "recommended to use `poll_stream_notify` instead")] #[allow(deprecated)] + #[doc(hidden)] pub fn poll_stream(&mut self, unpark: Arc) -> Poll, S::Error> { self.enter(BorrowedUnpark::Old(&unpark), |s| s.poll()) @@ -311,11 +292,7 @@ impl Spawn { } impl Spawn { - /// Invokes the underlying `start_send` method with this task in place. - /// - /// If the underlying operation returns `NotReady` then the `unpark` value - /// passed in will receive a notification when the operation is ready to be - /// attempted again. + #[doc(hidden)] #[deprecated(note = "recommended to use `start_send_notify` instead")] #[allow(deprecated)] pub fn start_send(&mut self, value: S::SinkItem, unpark: &Arc) @@ -323,13 +300,9 @@ impl Spawn { self.enter(BorrowedUnpark::Old(unpark), |s| s.start_send(value)) } - /// Invokes the underlying `poll_complete` method with this task in place. - /// - /// If the underlying operation returns `NotReady` then the `unpark` value - /// passed in will receive a notification when the operation is ready to be - /// attempted again. #[deprecated(note = "recommended to use `poll_flush_notify` instead")] #[allow(deprecated)] + #[doc(hidden)] pub fn poll_flush(&mut self, unpark: &Arc) -> Poll<(), S::SinkError> { self.enter(BorrowedUnpark::Old(unpark), |s| s.poll_complete()) @@ -418,6 +391,8 @@ pub trait Unpark: Send + Sync { /// Note that this trait is likely to be deprecated and/or renamed to avoid /// clashing with the `future::Executor` trait. If you've got a use case for /// this or would like to comment on the name please let us know! +#[deprecated] +#[allow(deprecated)] pub trait Executor: Send + Sync + 'static { /// Requests that `Run` is executed soon on the given executor. fn execute(&self, r: Run); @@ -425,16 +400,19 @@ pub trait Executor: Send + Sync + 'static { /// Units of work submitted to an `Executor`, currently only created /// internally. +#[deprecated] pub struct Run { spawn: Spawn + Send>>, inner: Arc, } +#[allow(deprecated)] struct RunInner { mutex: UnparkMutex, exec: Arc, } +#[allow(deprecated)] impl Run { /// Actually run the task (invoking `poll` on its future) on the current /// thread. @@ -462,6 +440,7 @@ impl Run { } } +#[allow(deprecated)] impl fmt::Debug for Run { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Run") @@ -470,6 +449,7 @@ impl fmt::Debug for Run { } } +#[allow(deprecated)] impl Notify for RunInner { fn notify(&self, _id: usize) { match self.mutex.notify() { @@ -728,3 +708,12 @@ impl From> for NotifyHandle } } } + +#[cfg(feature = "nightly")] +mod nightly { + use super::{TaskUnpark, UnparkEvents}; + use core::marker::Unpin; + + impl Unpin for TaskUnpark {} + impl Unpin for UnparkEvents {} +} diff --git a/src/unsync/mpsc.rs b/src/unsync/mpsc.rs index 68cf6b8cb9..ba0d52dc98 100644 --- a/src/unsync/mpsc.rs +++ b/src/unsync/mpsc.rs @@ -110,7 +110,11 @@ impl Drop for Sender { Some(shared) => shared, None => return, }; - if Rc::weak_count(&shared) == 0 { + // The number of existing `Weak` indicates if we are possibly the last + // `Sender`. If we are the last, we possibly must notify a blocked + // `Receiver`. `self.shared` is always one of the `Weak` to this shared + // data. Therefore the smallest possible Rc::weak_count(&shared) is 1. + if Rc::weak_count(&shared) == 1 { if let Some(task) = shared.borrow_mut().blocked_recv.take() { // Wake up receiver as its stream has ended task.notify(); diff --git a/tests/all.rs b/tests/all.rs index bdd67315c5..40e402f553 100644 --- a/tests/all.rs +++ b/tests/all.rs @@ -1,3 +1,5 @@ +#![allow(bare_trait_objects, unknown_lints)] + extern crate futures; use std::sync::mpsc::{channel, TryRecvError}; diff --git a/tests/bilock.rs b/tests/bilock.rs index 78d873635a..1658bdae27 100644 --- a/tests/bilock.rs +++ b/tests/bilock.rs @@ -1,3 +1,5 @@ +#![allow(bare_trait_objects, unknown_lints)] + extern crate futures; use std::thread; diff --git a/tests/channel.rs b/tests/channel.rs index 58c611b5ad..7940de4509 100644 --- a/tests/channel.rs +++ b/tests/channel.rs @@ -1,3 +1,5 @@ +#![allow(bare_trait_objects, unknown_lints)] + extern crate futures; use std::sync::atomic::*; @@ -52,6 +54,7 @@ fn drop_rx() { #[test] fn drop_order() { + #[allow(deprecated)] static DROPS: AtomicUsize = ATOMIC_USIZE_INIT; let (tx, rx) = mpsc::channel(1); diff --git a/tests/fuse.rs b/tests/fuse.rs index a1e6cee2f1..177d914e19 100644 --- a/tests/fuse.rs +++ b/tests/fuse.rs @@ -13,3 +13,27 @@ fn fuse() { assert!(future.poll_future_notify(¬ify_panic(), 0).unwrap().is_ready()); assert!(future.poll_future_notify(¬ify_panic(), 0).unwrap().is_not_ready()); } + +#[test] +fn fuse_is_done() { + use futures::future::{Fuse, FutureResult}; + + struct Wrapped(Fuse>); + + impl Future for Wrapped { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + assert!(!self.0.is_done()); + assert_eq!(self.0.poll().unwrap(), Async::Ready(2)); + assert!(self.0.is_done()); + assert_eq!(self.0.poll().unwrap(), Async::NotReady); + assert!(self.0.is_done()); + + Ok(Async::Ready(())) + } + } + + assert!(Wrapped(ok::(2).fuse()).wait().is_ok()); +} \ No newline at end of file diff --git a/tests/futures_ordered.rs b/tests/futures_ordered.rs index 229a8e58c0..6054192e3b 100644 --- a/tests/futures_ordered.rs +++ b/tests/futures_ordered.rs @@ -1,3 +1,5 @@ +#![allow(bare_trait_objects, unknown_lints)] + extern crate futures; use std::any::Any; diff --git a/tests/futures_unordered.rs b/tests/futures_unordered.rs index 9b8c08d01b..325a6f3e48 100644 --- a/tests/futures_unordered.rs +++ b/tests/futures_unordered.rs @@ -1,9 +1,12 @@ +#![allow(bare_trait_objects, unknown_lints)] + extern crate futures; use std::any::Any; use futures::sync::oneshot; -use futures::stream::futures_unordered; +use std::iter::FromIterator; +use futures::stream::{futures_unordered, FuturesUnordered}; use futures::prelude::*; mod support; @@ -125,3 +128,40 @@ fn iter_mut_len() { assert_eq!(iter_mut.len(), 0); assert!(iter_mut.next().is_none()); } + +#[test] +fn polled_only_once_at_most_per_iteration() { + #[derive(Debug, Clone, Copy, Default)] + struct F { + polled: bool, + } + + impl Future for F { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Result, Self::Error> { + if self.polled { + panic!("polled twice") + } else { + self.polled = true; + Ok(Async::NotReady) + } + } + } + + + let tasks = FuturesUnordered::from_iter(vec![F::default(); 10]); + let mut tasks = futures::executor::spawn(tasks); + assert!(tasks.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready()); + assert_eq!(10, tasks.get_mut().iter_mut().filter(|f| f.polled).count()); + + let tasks = FuturesUnordered::from_iter(vec![F::default(); 33]); + let mut tasks = futures::executor::spawn(tasks); + assert!(tasks.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready()); + assert_eq!(33, tasks.get_mut().iter_mut().filter(|f| f.polled).count()); + + let tasks = FuturesUnordered::::new(); + let mut tasks = futures::executor::spawn(tasks); + assert!(tasks.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_ready()); +} diff --git a/tests/mpsc-close.rs b/tests/mpsc-close.rs index 253e015705..061616ae06 100644 --- a/tests/mpsc-close.rs +++ b/tests/mpsc-close.rs @@ -1,9 +1,12 @@ extern crate futures; +use std::sync::{Arc, Weak}; use std::thread; +use std::time::{Duration, Instant}; use futures::prelude::*; use futures::sync::mpsc::*; +use futures::task; #[test] fn smoke() { @@ -19,3 +22,131 @@ fn smoke() { t.join().unwrap() } + +// Stress test that `try_send()`s occurring concurrently with receiver +// close/drops don't appear as successful sends. +#[test] +fn stress_try_send_as_receiver_closes() { + const AMT: usize = 10000; + // To provide variable timing characteristics (in the hopes of + // reproducing the collision that leads to a race), we busy-re-poll + // the test MPSC receiver a variable number of times before actually + // stopping. We vary this countdown between 1 and the following + // value. + const MAX_COUNTDOWN: usize = 20; + // When we detect that a successfully sent item is still in the + // queue after a disconnect, we spin for up to 100ms to confirm that + // it is a persistent condition and not a concurrency illusion. + const SPIN_TIMEOUT_S: u64 = 10; + const SPIN_SLEEP_MS: u64 = 10; + struct TestRx { + rx: Receiver>, + // The number of times to query `rx` before dropping it. + poll_count: usize + } + struct TestTask { + command_rx: Receiver, + test_rx: Option>>, + countdown: usize, + } + impl TestTask { + /// Create a new TestTask + fn new() -> (TestTask, Sender) { + let (command_tx, command_rx) = channel::(0); + ( + TestTask { + command_rx: command_rx, + test_rx: None, + countdown: 0, // 0 means no countdown is in progress. + }, + command_tx, + ) + } + } + impl Future for TestTask { + type Item = (); + type Error = (); + fn poll(&mut self) -> Poll<(), ()> { + // Poll the test channel, if one is present. + if let Some(ref mut rx) = self.test_rx { + if let Ok(Async::Ready(v)) = rx.poll() { + let _ = v.expect("test finished unexpectedly!"); + } + self.countdown -= 1; + // Busy-poll until the countdown is finished. + task::current().notify(); + } + // Accept any newly submitted MPSC channels for testing. + match self.command_rx.poll()? { + Async::Ready(Some(TestRx { rx, poll_count })) => { + self.test_rx = Some(rx); + self.countdown = poll_count; + task::current().notify(); + }, + Async::Ready(None) => return Ok(Async::Ready(())), + _ => {}, + } + if self.countdown == 0 { + // Countdown complete -- drop the Receiver. + self.test_rx = None; + } + Ok(Async::NotReady) + } + } + let (f, mut cmd_tx) = TestTask::new(); + let bg = thread::spawn(move || f.wait()); + for i in 0..AMT { + let (mut test_tx, rx) = channel(0); + let poll_count = i % MAX_COUNTDOWN; + cmd_tx.try_send(TestRx { rx: rx, poll_count: poll_count }).unwrap(); + let mut prev_weak: Option> = None; + let mut attempted_sends = 0; + let mut successful_sends = 0; + loop { + // Create a test item. + let item = Arc::new(()); + let weak = Arc::downgrade(&item); + match test_tx.try_send(item) { + Ok(_) => { + prev_weak = Some(weak); + successful_sends += 1; + } + Err(ref e) if e.is_full() => {} + Err(ref e) if e.is_disconnected() => { + // Test for evidence of the race condition. + if let Some(prev_weak) = prev_weak { + if prev_weak.upgrade().is_some() { + // The previously sent item is still allocated. + // However, there appears to be some aspect of the + // concurrency that can legitimately cause the Arc + // to be momentarily valid. Spin for up to 100ms + // waiting for the previously sent item to be + // dropped. + let t0 = Instant::now(); + let mut spins = 0; + loop { + if prev_weak.upgrade().is_none() { + break; + } + assert!(t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S), + "item not dropped on iteration {} after \ + {} sends ({} successful). spin=({})", + i, attempted_sends, successful_sends, spins + ); + spins += 1; + thread::sleep(Duration::from_millis(SPIN_SLEEP_MS)); + } + } + } + break; + } + Err(ref e) => panic!("unexpected error: {}", e), + } + attempted_sends += 1; + } + } + drop(cmd_tx); + bg.join() + .expect("background thread join") + .expect("background thread result"); +} diff --git a/tests/mpsc.rs b/tests/mpsc.rs index 8df98d490f..9cb83e5952 100644 --- a/tests/mpsc.rs +++ b/tests/mpsc.rs @@ -1,4 +1,5 @@ #![cfg(feature = "use_std")] +#![allow(bare_trait_objects, unknown_lints)] #[macro_use] extern crate futures; @@ -548,3 +549,19 @@ fn try_send_fail() { assert_eq!(rx.next(), Some(Ok("goodbye"))); assert!(rx.next().is_none()); } + +#[test] +fn bounded_is_really_bounded() { + use futures::Async::*; + let (mut tx, mut rx) = mpsc::channel(0); + lazy(|| { + assert!(tx.start_send(1).unwrap().is_ready()); + // Not ready until we receive + assert!(!tx.poll_complete().unwrap().is_ready()); + // Receive the value + assert_eq!(rx.poll().unwrap(), Ready(Some(1))); + // Now the sender is ready + assert!(tx.poll_complete().unwrap().is_ready()); + Ok::<_, ()>(()) + }).wait().unwrap(); +} diff --git a/tests/recurse.rs b/tests/recurse.rs index 4eb024ac95..a521ed13b7 100644 --- a/tests/recurse.rs +++ b/tests/recurse.rs @@ -1,3 +1,5 @@ +#![allow(bare_trait_objects, unknown_lints)] + extern crate futures; use std::sync::mpsc::channel; diff --git a/tests/shared.rs b/tests/shared.rs index 99d2b381ea..97989fe2cb 100644 --- a/tests/shared.rs +++ b/tests/shared.rs @@ -1,3 +1,5 @@ +#![allow(bare_trait_objects, unknown_lints)] + extern crate futures; mod support; @@ -202,3 +204,33 @@ fn recursive_poll_with_unpark() { drop(tx0); core.run(f3).unwrap(); } + +#[test] +fn shared_future_that_wakes_itself_until_pending_is_returned() { + use futures::Async; + use std::cell::Cell; + + let core = ::support::local_executor::Core::new(); + + let proceed = Cell::new(false); + let fut = futures::future::poll_fn(|| { + Ok::<_, ()>(if proceed.get() { + Async::Ready(()) + } else { + futures::task::current().notify(); + Async::NotReady + }) + }) + .shared() + .map(|_| ()) + .map_err(|_| ()); + + // The join future can only complete if the second future gets a chance to run after the first + // has returned pending + let second = futures::future::lazy(|| { + proceed.set(true); + Ok::<_, ()>(()) + }); + + core.run(fut.join(second)).unwrap(); +} diff --git a/tests/sink.rs b/tests/sink.rs index cb2fdcf26d..460dbdf20c 100644 --- a/tests/sink.rs +++ b/tests/sink.rs @@ -1,3 +1,5 @@ +#![allow(bare_trait_objects, unknown_lints)] + extern crate futures; use std::mem; @@ -372,7 +374,7 @@ fn fanout_backpressure() { let sink = StartSendFut::new(sink, 0).wait().unwrap(); let sink = StartSendFut::new(sink, 1).wait().unwrap(); - + let flag = Flag::new(); let mut task = executor::spawn(sink.send(2)); assert!(!flag.get()); @@ -392,6 +394,12 @@ fn fanout_backpressure() { let (item, right_recv) = right_recv.into_future().wait().unwrap(); assert_eq!(item, Some(1)); assert!(flag.get()); + let (item, left_recv) = left_recv.into_future().wait().unwrap(); + assert_eq!(item, Some(2)); + assert!(flag.get()); + assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready()); + let (item, right_recv) = right_recv.into_future().wait().unwrap(); + assert_eq!(item, Some(2)); match task.poll_future_notify(&flag, 0).unwrap() { Async::Ready(_) => { }, diff --git a/tests/stream.rs b/tests/stream.rs index eb7560351d..2400a2abb1 100644 --- a/tests/stream.rs +++ b/tests/stream.rs @@ -1,3 +1,5 @@ +#![allow(bare_trait_objects, unknown_lints)] + #[macro_use] extern crate futures; diff --git a/tests/support/local_executor.rs b/tests/support/local_executor.rs index 615efc1f70..cf89e8152f 100644 --- a/tests/support/local_executor.rs +++ b/tests/support/local_executor.rs @@ -4,6 +4,8 @@ //! futures-aware inter-thread communications, and is not intended to be used to //! manage I/O. For futures that do I/O you'll likely want to use `tokio-core`. +#![allow(bare_trait_objects, unknown_lints)] + use std::cell::{Cell, RefCell}; use std::sync::{Arc, Mutex, mpsc}; diff --git a/tests/unsync.rs b/tests/unsync.rs index b5ae8d0fbf..490db0af1c 100644 --- a/tests/unsync.rs +++ b/tests/unsync.rs @@ -1,4 +1,5 @@ #![cfg(feature = "use_std")] +#![allow(bare_trait_objects, unknown_lints)] extern crate futures; @@ -201,3 +202,65 @@ fn spawn_kill_dead_stream() { }, } } + + +/// Test case for PR #768 (issue #766). +/// The issue was: +/// Given that an empty channel is polled by the Receiver, and the only Sender +/// gets dropped without sending anything, then the Receiver would get stuck. + +#[test] +fn dropped_sender_of_unused_channel_notifies_receiver() { + let core = Core::new(); + type FUTURE = Box>; + + // Constructs the channel which we want to test, and two futures which + // act on that channel. + let pair = |reverse| -> Vec { + // This is the channel which we want to test. + let (tx, rx) = mpsc::channel::(1); + let mut futures: Vec = vec![ + Box::new(futures::stream::iter_ok(vec![]) + .forward(tx) + .map_err(|_: mpsc::SendError| ()) + .map(|_| 42) + ), + Box::new(rx.fold((), |_, _| Ok(())) + .map(|_| 24) + ), + ]; + if reverse { + futures.reverse(); + } + futures + }; + + let make_test_future = |reverse| -> Box, Error=()>> { + let f = futures::future::join_all(pair(reverse)); + + // Use a timeout. This is not meant to test the `sync::oneshot` but + // merely uses it to implement this timeout. + let (timeout_tx, timeout_rx) = futures::sync::oneshot::channel::>(); + std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(1000)); + let x = timeout_tx.send(vec![0]); + assert!(x.is_err(), "Test timed out."); + }); + + Box::new(f.select(timeout_rx.map_err(|_|())) + .map_err(|x| x.0) + .map(|x| x.0) + ) + }; + + // The order of the tested futures is important to test fix of PR #768. + // We want future_2 to poll on the Receiver before the Sender is dropped. + let result = core.run(make_test_future(false)); + assert!(result.is_ok()); + assert_eq!(vec![42, 24], result.unwrap()); + + // Test also the other ordering: + let result = core.run(make_test_future(true)); + assert!(result.is_ok()); + assert_eq!(vec![24, 42], result.unwrap()); +}