Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
perf(heap): reduce allocations on heap repack with sort
  • Loading branch information
darthunix committed Dec 7, 2025
commit e9858f97ffc6e36fe7309749b547717db6898c52
6 changes: 5 additions & 1 deletion ai/memory/gotchas.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ id: gotchas-0001
type: gotcha
scope: repo
tags: ["joins", "partitions", "varlena", "signals"]
updated_at: "2025-11-29"
updated_at: "2025-12-07"
importance: 0.7
---

Expand All @@ -13,3 +13,7 @@ importance: 0.7
- TOAST/Compressed varlena: in projection → error; when not projected → skip, don’t crash.
- SIGUSR1: requires a valid client PID; not available on non‑Unix.
- SHM races: don’t cache borrowed slices beyond one read cycle; clamp lengths.

## Executor borrow patterns

- Caching tuple `(off,len)` pairs in `PgScanStream`: fill a self-owned `Vec<(u16,u16)>` and create the iterator inside a tight scope. Clone needed metadata (`attrs_full`, `proj_indices`) into locals and avoid using `self` while the iterator (borrowing the pairs slice) is alive. This sidesteps borrow checker conflicts between a mutable `&mut self` and an immutable borrow of `self.pairs`.
56 changes: 31 additions & 25 deletions executor/src/pgscan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ pub struct PgScanStream {
proj_indices: Arc<Vec<usize>>,
rx: Option<mpsc::Receiver<HeapBlock>>,
conn_id: usize,
pairs: Vec<(u16, u16)>,
}

impl PgScanStream {
Expand All @@ -363,6 +364,7 @@ impl PgScanStream {
proj_indices,
rx,
conn_id,
pairs: Vec::new(),
}
}
}
Expand Down Expand Up @@ -392,45 +394,49 @@ impl Stream for PgScanStream {

// Create a HeapPage view and iterate tuples
let hp = unsafe { HeapPage::from_slice(page) };
let batch = RecordBatch::new_empty(Arc::clone(&this.proj_schema));
let Ok(hp) = hp else {
// On error decoding page, return empty batch for resilience
let batch = RecordBatch::new_empty(Arc::clone(&this.proj_schema));
return Poll::Ready(Some(Ok(batch)));
};

// Prepare per-column builders
let col_count = total_cols;
let mut builders = make_builders(&this.proj_schema, block.num_offsets as usize)
.map_err(|e| datafusion::error::DataFusionError::Execution(format!("{e}")))?;
// Use tuples_by_offset to iterate LP_NORMAL tuples in page order
let mut pairs: Vec<(u16, u16)> = Vec::new();
// Populate pairs once and create iterator borrowing the filled pairs slice
let it = hp.tuples_by_offset(None, std::ptr::null_mut(), &mut pairs);
// Take local clones of shared metadata to avoid borrowing `this` during tuple iteration
let proj_indices = Arc::clone(&this.proj_indices);
let attrs_full = Arc::clone(&this.attrs_full);
let page_hdr = unsafe { &*(page.as_ptr() as *const pg_sys::PageHeaderData) }
as *const pg_sys::PageHeaderData;
let mut decoded_rows = 0usize;
for tup in it {
// Decode projected columns for tuple using iterator over requested projection
let iter = unsafe {
decode_tuple_project(
page_hdr,
tup,
&this.attrs_full,
this.proj_indices.iter().copied(),
)
};
let Ok(mut iter) = iter else {
continue;
};
// Iterate over projected columns in order
for b in builders.iter_mut().take(total_cols) {
match iter.next() {
Some(Ok(v)) => append_scalar(b, v),
Some(Err(_e)) => append_null(b),
None => append_null(b),
// Limit the borrow of `this.pairs` to this inner scope to satisfy borrow checker
{
// Populate pairs once and create iterator borrowing the filled pairs slice
let it = hp.tuples_by_offset(None, std::ptr::null_mut(), &mut this.pairs);
for tup in it {
// Decode projected columns for tuple using iterator over requested projection
let iter = unsafe {
decode_tuple_project(
page_hdr,
tup,
&attrs_full,
proj_indices.iter().copied(),
)
};
let Ok(mut iter) = iter else {
continue;
};
// Iterate over projected columns in order
for b in builders.iter_mut().take(total_cols) {
match iter.next() {
Some(Ok(v)) => append_scalar(b, v),
Some(Err(_e)) => append_null(b),
None => append_null(b),
}
}
decoded_rows += 1;
}
decoded_rows += 1;
}

// Build Arrow arrays and RecordBatch
Expand Down
Loading