Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix: float encoding
  • Loading branch information
darthunix committed Nov 29, 2025
commit a34b5f3d53f675867d2b13335aefa7626ff43583
18 changes: 17 additions & 1 deletion executor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::sql::Catalog;
use anyhow::Error;
use anyhow::{bail, Result};
use common::FusionError;
use datafusion::arrow::array::{Array, Int32Array, Int64Array, StringArray};
use datafusion::arrow::array::{Array, Float32Array, Float64Array, Int32Array, Int64Array, StringArray};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::config::ConfigOptions;
use datafusion::execution::SessionStateBuilder;
Expand Down Expand Up @@ -518,6 +518,22 @@ fn encode_and_write_rows(
let v = a.value(row);
fields.push(F::ByVal8(v.to_le_bytes()));
}
datafusion::arrow::datatypes::DataType::Float32 => {
let v = array
.as_any()
.downcast_ref::<Float32Array>()
.expect("float32 array")
.value(row);
fields.push(F::ByVal4(v.to_le_bytes()));
}
datafusion::arrow::datatypes::DataType::Float64 => {
let v = array
.as_any()
.downcast_ref::<Float64Array>()
.expect("float64 array")
.value(row);
fields.push(F::ByVal8(v.to_le_bytes()));
}
datafusion::arrow::datatypes::DataType::Utf8 => {
if let Some(idx) = idx_ref {
fields.push(F::ByRef(owned[*idx].as_slice()));
Expand Down
23 changes: 22 additions & 1 deletion storage/src/heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,11 @@ fn decode_fixed_width(atttypid: pg_sys::Oid, bytes: &[u8]) -> Result<Option<Scal

let v = match atttypid {
x if x == BOOLOID => ScalarValue::Boolean(Some(bytes[0] != 0)),
// PostgreSQL internal single-byte "char" type (not BPCHAR)
x if x == CHAROID => {
let ch = bytes[0] as char;
ScalarValue::Utf8(Some(ch.to_string()))
}
x if x == INT2OID => {
let mut a = [0u8; 2];
a.copy_from_slice(bytes);
Expand All @@ -297,6 +302,13 @@ fn decode_fixed_width(atttypid: pg_sys::Oid, bytes: &[u8]) -> Result<Option<Scal
a.copy_from_slice(bytes);
ScalarValue::Int32(Some(i32::from_ne_bytes(a)))
}
// OID maps to Int32 for now (DataFusion UInt32 available but keep consistent with common integer use)
x if x == OIDOID => {
let mut a = [0u8; 4];
a.copy_from_slice(bytes);
let v = u32::from_ne_bytes(a) as i32;
ScalarValue::Int32(Some(v))
}
x if x == INT8OID => {
let mut a = [0u8; 8];
a.copy_from_slice(bytes);
Expand Down Expand Up @@ -358,6 +370,13 @@ fn decode_fixed_width(atttypid: pg_sys::Oid, bytes: &[u8]) -> Result<Option<Scal
},
))
}
// name is fixed-length (NAMEDATALEN) with trailing NULs
x if x == NAMEOID => {
let end = bytes.iter().position(|&b| b == 0).unwrap_or(bytes.len());
let s = std::str::from_utf8(&bytes[..end])
.map_err(|e| anyhow::anyhow!("invalid utf8 in name: {}", e))?;
ScalarValue::Utf8(Some(s.to_string()))
}
_ => return Ok(None),
};
Ok(Some(v))
Expand Down Expand Up @@ -388,12 +407,14 @@ fn typed_null_for(atttypid: pg_sys::Oid) -> ScalarValue {
use pg_sys::*;
match atttypid {
x if x == BOOLOID => ScalarValue::Boolean(None),
x if x == CHAROID => ScalarValue::Utf8(None),
x if x == INT2OID => ScalarValue::Int16(None),
x if x == INT4OID => ScalarValue::Int32(None),
x if x == OIDOID => ScalarValue::Int32(None),
x if x == INT8OID => ScalarValue::Int64(None),
x if x == FLOAT4OID => ScalarValue::Float32(None),
x if x == FLOAT8OID => ScalarValue::Float64(None),
x if x == TEXTOID || x == VARCHAROID || x == BPCHAROID => ScalarValue::Utf8(None),
x if x == TEXTOID || x == VARCHAROID || x == BPCHAROID || x == NAMEOID => ScalarValue::Utf8(None),
x if x == DATEOID => ScalarValue::Date32(None),
x if x == TIMEOID => ScalarValue::Time64Microsecond(None),
x if x == TIMESTAMPOID || x == TIMESTAMPTZOID => {
Expand Down