Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/oli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ http-body = "1"
log = "0.4"
md-5 = "0.10"
percent-encoding = "2"
pin-project-lite = { version = "0.2.16"}
quick-xml = { version = "0.38", features = ["serialize", "overlapped-lists"] }
reqwest = { version = "0.12.22", features = [
"stream",
Expand Down
5 changes: 4 additions & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@
//! - [Performance Guide][crate::docs::performance]

// Make sure all our public APIs have docs.
#![warn(missing_docs)]
#![deny(missing_docs)]

// Private modules, they will not be accessed by users.
mod patches;

// Private module with public types, they will be accessed via `opendal::Xxxx`
mod types;
Expand Down
158 changes: 158 additions & 0 deletions core/src/patches/buffer_by_ordered.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::VecDeque;
use std::fmt;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;

use futures::future::Future;
use futures::stream::Fuse;
use futures::stream::FuturesOrdered;
use futures::stream::Stream;
use futures::stream::StreamExt;

use pin_project_lite::pin_project;

pin_project! {
/// Stream for the [`buffer_by_ordered`] method.
///
/// [`buffer_by_ordered`]: crate::StreamExt::buffer_by_ordered
#[must_use = "streams do nothing unless polled"]
pub struct BufferByOrdered<St, F>
where
St: Stream<Item = (F, usize)>,
F: Future,
{
#[pin]
stream: Fuse<St>,
in_progress_queue: FuturesOrdered<SizedFuture<F>>,
ready_queue: VecDeque<(F::Output, usize)>,
max_size: usize,
current_size: usize,
}
}

impl<St, F> fmt::Debug for BufferByOrdered<St, F>
where
St: Stream<Item = (F, usize)> + fmt::Debug,
F: Future,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BufferByOrdered")
.field("stream", &self.stream)
.field("in_progress_queue", &self.in_progress_queue)
.field("max_size", &self.max_size)
.field("current_size", &self.current_size)
.finish()
}
}

impl<St, F> BufferByOrdered<St, F>
where
St: Stream<Item = (F, usize)>,
F: Future,
{
pub(crate) fn new(stream: St, max_size: usize) -> Self {
Self {
stream: stream.fuse(),
in_progress_queue: FuturesOrdered::new(),
ready_queue: VecDeque::new(),
max_size,
current_size: 0,
}
}
}

impl<St, F> Stream for BufferByOrdered<St, F>
where
St: Stream<Item = (F, usize)>,
F: Future,
{
type Item = F::Output;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();

// First up, try to spawn off as many futures as possible by filling up
// our queue of futures.
while this.current_size < this.max_size {
match this.stream.as_mut().poll_next(cx) {
Poll::Ready(Some((future, size))) => {
*this.current_size += size;
this.in_progress_queue
.push_back(SizedFuture { future, size });
}
Poll::Ready(None) => break,
Poll::Pending => break,
}
}

// Try to poll all ready futures in the in_progress_queue.
loop {
match this.in_progress_queue.poll_next_unpin(cx) {
Poll::Ready(Some(output)) => {
this.ready_queue.push_back(output);
}
Poll::Ready(None) => break,
Poll::Pending => break,
}
}

if let Some((output, size)) = this.ready_queue.pop_front() {
// If we have any ready outputs, return the first one.
*this.current_size -= size;
Poll::Ready(Some(output))
} else if this.stream.is_done() && this.in_progress_queue.is_empty() {
Poll::Ready(None)
} else {
Poll::Pending
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let queue_len = self.in_progress_queue.len() + self.ready_queue.len();
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(queue_len);
let upper = match upper {
Some(x) => x.checked_add(queue_len),
None => None,
};
(lower, upper)
}
}

pin_project! {
struct SizedFuture<F> {
#[pin]
future: F,
size: usize,
}
}

impl<F: Future> Future for SizedFuture<F> {
type Output = (F::Output, usize);

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.future.poll(cx) {
Poll::Ready(output) => Poll::Ready((output, *this.size)),
Poll::Pending => Poll::Pending,
}
}
}
19 changes: 19 additions & 0 deletions core/src/patches/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

pub(crate) mod buffer_by_ordered;
pub(crate) mod stream;
34 changes: 34 additions & 0 deletions core/src/patches/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::patches::buffer_by_ordered::BufferByOrdered;

use futures::Stream;
use std::future::Future;

pub trait StreamExt: Stream {
fn buffer_by_ordered<F>(self, max_size: usize) -> BufferByOrdered<Self, F>
where
Self: Sized,
Self: Stream<Item = (F, usize)>,
F: Future,
{
BufferByOrdered::new(self, max_size)
}
}

impl<T: ?Sized> StreamExt for T where T: Stream {}
14 changes: 10 additions & 4 deletions core/src/raw/futures_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,17 @@ use futures::FutureExt;
use crate::*;

/// BoxedFuture is the type alias of [`futures::future::BoxFuture`].
///
/// We will switch to [`futures::future::LocalBoxFuture`] on wasm32 target.
#[cfg(not(target_arch = "wasm32"))]
pub type BoxedFuture<'a, T> = futures::future::BoxFuture<'a, T>;
#[cfg(target_arch = "wasm32")]
/// BoxedFuture is the type alias of [`futures::future::LocalBoxFuture`].
pub type BoxedFuture<'a, T> = futures::future::LocalBoxFuture<'a, T>;

/// BoxedStaticFuture is the type alias of [`futures::future::BoxFuture`].
///
/// We will switch to [`futures::future::LocalBoxFuture`] on wasm32 target.
#[cfg(not(target_arch = "wasm32"))]
pub type BoxedStaticFuture<T> = futures::future::BoxFuture<'static, T>;
#[cfg(target_arch = "wasm32")]
/// BoxedStaticFuture is the type alias of [`futures::future::LocalBoxFuture`].
pub type BoxedStaticFuture<T> = futures::future::LocalBoxFuture<'static, T>;

/// MaybeSend is a marker to determine whether a type is `Send` or not.
Expand All @@ -49,6 +47,14 @@ pub type BoxedStaticFuture<T> = futures::future::LocalBoxFuture<'static, T>;
/// And it's empty trait on wasm32 target to indicate that a type is not `Send`.
#[cfg(not(target_arch = "wasm32"))]
pub trait MaybeSend: Send {}

/// MaybeSend is a marker to determine whether a type is `Send` or not.
/// We use this trait to wrap the `Send` requirement for wasm32 target.
///
/// # Safety
///
/// [`MaybeSend`] is equivalent to `Send` on non-wasm32 target.
/// And it's empty trait on wasm32 target to indicate that a type is not `Send`.
#[cfg(target_arch = "wasm32")]
pub trait MaybeSend {}

Expand Down
1 change: 1 addition & 0 deletions core/src/raw/oio/list/page_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub trait PageList: Send + Sync + Unpin + 'static {
#[cfg(not(target_arch = "wasm32"))]
fn next_page(&self, ctx: &mut PageContext) -> impl Future<Output = Result<()>> + MaybeSend;
#[cfg(target_arch = "wasm32")]
/// next_page is used to fetch next page of entries from underlying storage.
fn next_page(&self, ctx: &mut PageContext) -> impl Future<Output = Result<()>>;
}

Expand Down
6 changes: 3 additions & 3 deletions core/src/types/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use std::sync::Arc;

use bytes::BufMut;
use futures::stream;
use futures::StreamExt;
use futures::TryStreamExt;

use crate::patches::stream::StreamExt;
use crate::*;

/// Reader is designed to read data from given path in an asynchronous
Expand Down Expand Up @@ -147,8 +147,8 @@ impl Reader {
let merged_ranges = self.merge_ranges(ranges.clone());

let merged_bufs: Vec<_> =
stream::iter(merged_ranges.clone().into_iter().map(|v| self.read(v)))
.buffered(self.ctx.options().concurrent())
stream::iter(merged_ranges.clone().into_iter().map(|v| (self.read(v), 1)))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second value is size.

.buffer_by_ordered(self.ctx.options().concurrent())
.try_collect()
.await?;

Expand Down
Loading