Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: add CachedFileReader to wrap opendal reader
  • Loading branch information
kyteware committed Jun 9, 2025
commit acfcd7a4a402b467d145c96e871187fa088e2e01
64 changes: 62 additions & 2 deletions crates/iceberg/src/io/file_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ use bytes::Bytes;
use opendal::Operator;
use url::Url;

use super::data_cache::{DataCache, DataCacheRef, DataCacheRes};
use super::storage::Storage;
use crate::{Error, ErrorKind, Result};

const DEFAULT_MAX_CACHE_SIZE: u64 = 32 * 1000 * 1000; // 32 Mb

/// FileIO implementation, used to manipulate files in underlying storage.
///
/// # Note
Expand All @@ -44,8 +47,8 @@ use crate::{Error, ErrorKind, Result};
#[derive(Clone, Debug)]
pub struct FileIO {
builder: FileIOBuilder,

inner: Arc<Storage>,
cache: DataCacheRef,
}

impl FileIO {
Expand Down Expand Up @@ -145,6 +148,7 @@ impl FileIO {
op,
path,
relative_path_pos,
cache: self.cache.clone(),
})
}

Expand All @@ -161,6 +165,7 @@ impl FileIO {
op,
path,
relative_path_pos,
cache: self.cache.clone(),
})
}
}
Expand All @@ -174,6 +179,8 @@ pub struct FileIOBuilder {
scheme_str: Option<String>,
/// Arguments for operator.
props: HashMap<String, String>,
/// Maximum capacity of the hash, in bytes
cache_size: u64,
}

impl FileIOBuilder {
Expand All @@ -183,6 +190,7 @@ impl FileIOBuilder {
Self {
scheme_str: Some(scheme_str.to_string()),
props: HashMap::default(),
cache_size: DEFAULT_MAX_CACHE_SIZE,
}
}

Expand All @@ -191,6 +199,7 @@ impl FileIOBuilder {
Self {
scheme_str: None,
props: HashMap::default(),
cache_size: DEFAULT_MAX_CACHE_SIZE,
}
}

Expand All @@ -217,12 +226,20 @@ impl FileIOBuilder {
self
}

/// Set the maximum cache size in bytes
pub fn with_cache_size(mut self, cache_size: u64) -> Self {
self.cache_size = cache_size;
self
}

/// Builds [`FileIO`].
pub fn build(self) -> Result<FileIO> {
let storage = Storage::build(self.clone())?;
let cache_size = self.cache_size;
Ok(FileIO {
builder: self,
inner: Arc::new(storage),
cache: Arc::new(DataCache::new(cache_size)),
})
}
}
Expand All @@ -249,6 +266,38 @@ pub trait FileRead: Send + Sync + Unpin + 'static {
async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
}

/// A file reader that can cache read buffers for future use
pub struct CachedFileReader {
reader: opendal::Reader,
cache: DataCacheRef,
path: String,
}

#[async_trait::async_trait]
impl FileRead for CachedFileReader {
async fn read(&self, range: Range<u64>) -> crate::Result<Bytes> {
match self.cache.get(&self.path, range.clone()).await {
DataCacheRes::Hit(res) => Ok(res),
DataCacheRes::PartialHit(partial_hit) => {
let missing_bytes = self
.reader
.read(partial_hit.missing_range())
.await?
.to_bytes();
Ok(self
.cache
.fill_partial_hit(partial_hit, missing_bytes)
.await)
}
DataCacheRes::Miss => {
let res = self.reader.read(range.clone()).await?.to_bytes();
self.cache.set(&self.path, range, res.clone()).await;
Ok(res)
}
}
}
}

#[async_trait::async_trait]
impl FileRead for opendal::Reader {
async fn read(&self, range: Range<u64>) -> crate::Result<Bytes> {
Expand All @@ -264,6 +313,7 @@ pub struct InputFile {
path: String,
// Relative path of file to uri, starts at [`relative_path_pos`]
relative_path_pos: usize,
cache: DataCacheRef,
}

impl InputFile {
Expand All @@ -289,6 +339,7 @@ impl InputFile {
/// Read and returns whole content of file.
///
/// For continuous reading, use [`Self::reader`] instead.
// TODO: implement cache-level understanding of file size and completeness so this function can use cache too
pub async fn read(&self) -> crate::Result<Bytes> {
Ok(self
.op
Expand All @@ -300,8 +351,15 @@ impl InputFile {
/// Creates [`FileRead`] for continuous reading.
///
/// For one-time reading, use [`Self::read`] instead.
// TODO: figure out how to cache reads with a reader
pub async fn reader(&self) -> crate::Result<impl FileRead + use<>> {
Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?)
let direct_reader = self.op.reader(&self.path[self.relative_path_pos..]).await?;

Ok(CachedFileReader {
reader: direct_reader,
cache: self.cache.clone(),
path: self.path.clone(),
})
}
}

Expand Down Expand Up @@ -344,6 +402,7 @@ pub struct OutputFile {
path: String,
// Relative path of file to uri, starts at [`relative_path_pos`]
relative_path_pos: usize,
cache: DataCacheRef, // TODO: cache writes to output files to prevent unecessary reads
}

impl OutputFile {
Expand All @@ -370,6 +429,7 @@ impl OutputFile {
op: self.op,
path: self.path,
relative_path_pos: self.relative_path_pos,
cache: self.cache,
}
}

Expand Down