Skip to content

Conversation

@peterxcli
Copy link
Member

Which issue does this PR close?

Rationale for this change

  • Logical plans can include file sources. To serialize/deserialize logical plans (e.g., to ship between processes, store/replay), DataFusion needs to encode the file format configuration (CSV/JSON/Parquet/Arrow).
  • Previously, the default codec didn’t implement try_encode_file_format / try_decode_file_format (it returned “not implemented”). This change makes the default codec able to handle the common formats out-of-the-box.

What changes are included in this PR?

  • New try_decode_file_format(&self, buf: &[u8], ctx: &SessionContext) -> Result<Arc<dyn FileFormatFactory>> in DefaultLogicalExtensionCodec
  • New try_encode_file_format(&self, buf: &mut Vec<u8>, node: Arc<dyn FileFormatFactory>) -> Result<()> in DefaultLogicalExtensionCodec

Are these changes tested?

  • direct test:
    • tests added inside datafusion/proto/tests/cases/roundtrip_logical_plan.rs
  • indirect test:
    • roundtrip_arrow_scan — Uses logical_plan_to_bytes() and logical_plan_from_bytes(), which use DefaultLogicalExtensionCodec, so it exercises Arrow file format encoding/decoding.
    • roundtrip_custom_listing_tables — Uses the default codec and may involve file formats.

Are there any user-facing changes?

Yes, makes the default codec able to handle the common formats out-of-the-box.

…ExtensionCodec

* Implemented `try_decode_file_format` and `try_encode_file_format` methods to handle various file formats (CSV, JSON, Parquet, Arrow).
* Introduced internal structures for file format handling, including `FileFormatKind`, `FileFormatProtoWrapper`, and `FileFormatWrapper`.
* Enhanced error handling for unsupported file formats.
@github-actions github-actions bot added the proto Related to proto crate label Dec 3, 2025
@peterxcli peterxcli changed the title feat: add file format encoding and decoding support to DefaultLogical… Make DefaultLogicalExtensionCodec support serialisation of build in file formats Dec 3, 2025
@peterxcli peterxcli force-pushed the make-DefaultLogicalExtensionCodec-support-serialisation-of-build-in-file-formats branch from 7935eb2 to bb09871 Compare December 4, 2025 02:45
@peterxcli
Copy link
Member Author

cc @milenkovicm Please take a look. Thanks!

Copy link
Contributor

@milenkovicm milenkovicm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @peterxcli it looks good on a quick glance, will have a better look later.
I have two comments to start with

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this file added? perhaps its part of different PR ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh sorry i accidently committed these file, actually this file is just for myself to read.
thanks for catching, will revert

@peterxcli peterxcli requested a review from milenkovicm December 5, 2025 08:01
Copy link
Contributor

@milenkovicm milenkovicm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution @peterxcli
I have some follow up questions, please have a look when you get chance.
Also, I'd like to ask you to which extend you used coding assistant(s) for this PR?

Comment on lines +268 to +308
fn try_decode_roundtrip(
ctx: &TaskContext,
buf: &[u8],
) -> Option<Arc<dyn FileFormatFactory>> {
#[cfg(feature = "parquet")]
let candidates: &[&dyn LogicalExtensionCodec] = &[
&ParquetLogicalExtensionCodec {},
&CsvLogicalExtensionCodec {},
&JsonLogicalExtensionCodec {},
];
#[cfg(not(feature = "parquet"))]
let candidates: &[&dyn LogicalExtensionCodec] =
&[&CsvLogicalExtensionCodec {}, &JsonLogicalExtensionCodec {}];

for codec in candidates {
if let Ok(ff) = codec.try_decode_file_format(buf, ctx) {
let mut re = Vec::new();
if codec
.try_encode_file_format(
&mut re,
Arc::<dyn FileFormatFactory>::clone(&ff),
)
.is_ok()
&& re == buf
{
return Some(ff);
}
}
}
None
}

if let Some(ff) = try_decode_roundtrip(ctx, buf) {
return Ok(ff);
}

// If nothing matched, return a clear error rather than guessing
exec_err!(
"Unsupported FileFormatFactory bytes for DefaultLogicalExtensionCodec ({} bytes)",
buf.len()
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats the purpose of this code?

Comment on lines +365 to +371
#[derive(Clone, PartialEq, ::prost::Message)]
struct FileFormatProtoWrapper {
#[prost(int32, tag = "1")]
kind: i32,
#[prost(bytes, tag = "2")]
blob: Vec<u8>,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whould it make sense to have this as part of proto file ?

Comment on lines +373 to +411
struct FileFormatWrapper {
kind: FileFormatKind,
blob: Vec<u8>,
}

impl FileFormatWrapper {
fn new(kind: FileFormatKind, blob: Vec<u8>) -> Self {
Self { kind, blob }
}

fn encode_into(self, buf: &mut Vec<u8>) -> Result<()> {
let proto = FileFormatProtoWrapper {
kind: self.kind as i32,
blob: self.blob,
};
proto
.encode(buf)
.map_err(|e| DataFusionError::Internal(e.to_string()))
}

fn try_decode(buf: &[u8]) -> Result<Self> {
let proto = FileFormatProtoWrapper::decode(buf)
.map_err(|e| DataFusionError::Internal(e.to_string()))?;
let kind = match proto.kind {
0 => FileFormatKind::Csv,
1 => FileFormatKind::Json,
2 => FileFormatKind::Parquet,
3 => FileFormatKind::Arrow,
_ => {
return Err(DataFusionError::Internal(
"Unknown file format kind".to_string(),
))
}
};
Ok(Self {
kind,
blob: proto.blob,
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need structure additional to actual decode function

}

#[test]
fn test_default_codec_legacy_raw_bytes_roundtrip() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this functionality was not implemented how can we have a legacy implementation?

}

#[test]
fn test_default_codec_legacy_empty_buffer() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

his functionality was not implemented how can we have a legacy implementation?

Comment on lines +2852 to +2853
csv_format.delimiter = b'|';
csv_format.has_header = Some(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats the purpose of setting this values if not checked later?


// Test JSON with custom options
let mut json_format = table_options.json.clone();
json_format.compression = CompressionTypeVariant::GZIP;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats the purpose of setting this value if not checked later?


// Test Parquet with custom options
let mut parquet_format = table_options.parquet.clone();
parquet_format.global.bloom_filter_on_read = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats the purpose of setting this value if not checked later?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

proto Related to proto crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

make DefaultLogicalExtensionCodec support serialisation of build in file formats

2 participants