Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: Implement find combinator for StreamExt
  • Loading branch information
aaamourao committed Apr 19, 2025
commit 9bb7938239c8f9061a9c9f3c9bd35592e8493ac1
84 changes: 84 additions & 0 deletions futures-util/src/stream/stream/find.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use crate::Stream;

use core::future::Future;
use core::pin::Pin;
use core::task::{ready, Context, Poll};
use futures_core::FusedFuture;
use pin_project_lite::pin_project;

pin_project! {
/// Future for the [`find`](super::StreamExt::find) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Find<St, Fut, F>
where St: Stream,
{
#[pin]
stream: St,
f: F,
done: bool,
#[pin]
pending_fut: Option<Fut>,
pending_item: Option<St::Item>,
}
}

impl<St, Fut, F> Find<St, Fut, F>
where
St: Stream,
F: FnMut(&St::Item) -> Fut,
Fut: Future<Output = bool>,
{
pub(super) fn new(stream: St, f: F) -> Self {
Self { stream, f, done: false, pending_fut: None, pending_item: None }
}
}

impl<St, Fut, F> FusedFuture for Find<St, Fut, F>
where
St: Stream,
F: FnMut(&St::Item) -> Fut,
Fut: Future<Output = bool>,
{
fn is_terminated(&self) -> bool {
self.done && self.pending_fut.is_none()
}
}

impl<St, Fut, F> Future for Find<St, Fut, F>
where
St: futures_core::Stream,
F: FnMut(&St::Item) -> Fut,
Fut: Future<Output = bool>,
{
type Output = Option<St::Item>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
let mut this = self.project();
Poll::Ready(loop {
if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
// we're currently processing a future to produce a new value
let res = ready!(fut.poll(cx));
this.pending_fut.set(None);
if res {
*this.done = true;
break this.pending_item.take();
}
} else if !*this.done {
match ready!(this.stream.as_mut().poll_next(cx)) {
// we're waiting on a new item from the stream
Some(item) => {
this.pending_fut.set(Some((this.f)(&item)));
*this.pending_item = Some(item);
}
None => {
*this.done = true;
break None;
}
}
} else {
break None;
}
})
}
}
26 changes: 26 additions & 0 deletions futures-util/src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub use self::filter::Filter;
mod filter_map;
pub use self::filter_map::FilterMap;

mod find;
use self::find::Find;

mod flatten;

delegate_all!(
Expand Down Expand Up @@ -672,6 +675,29 @@ pub trait StreamExt: Stream {
assert_future::<T, _>(Fold::new(self, f, init))
}

/// Execute asynchronous predicate over asynchronous stream, and return `Option<Stream::Item>`
/// if any element in stream satisfied the predicate.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::stream::{self, StreamExt};
///
/// let number_stream = stream::iter([1, 3, 6, 5, 9]);
/// let first_even = number_stream.find(|&i| async move { i % 2 == 0 });
/// assert_eq!(first_even.await, Some(6));
/// # });
/// ```
fn find<Fut, F>(self, f: F) -> Find<Self, Fut, F>
where
Self: Sized,
F: FnMut(&Self::Item) -> Fut,
Fut: Future<Output = bool>,
{
assert_future::<Option<Self::Item>, _>(Find::new(self, f))
}

/// Execute predicate over asynchronous stream, and return `true` if any element in stream satisfied a predicate.
///
/// # Examples
Expand Down