Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions arrow-ipc/benches/ipc_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ fn criterion_benchmark(c: &mut Criterion) {
// Convert the mmap region to an Arrow `Buffer` to back the arrow arrays.
let bytes = bytes::Bytes::from_owner(mmap);
let buffer = Buffer::from(bytes);
let decoder = IPCBufferDecoder::new(buffer);
let mut decoder = IPCBufferDecoder::new(buffer);
assert_eq!(decoder.num_batches(), 10);

for i in 0..decoder.num_batches() {
Expand All @@ -160,8 +160,7 @@ fn criterion_benchmark(c: &mut Criterion) {
// Convert the mmap region to an Arrow `Buffer` to back the arrow arrays.
let bytes = bytes::Bytes::from_owner(mmap);
let buffer = Buffer::from(bytes);
let decoder = IPCBufferDecoder::new(buffer);
let decoder = unsafe { decoder.with_skip_validation(true) };
let mut decoder = unsafe { IPCBufferDecoder::new(buffer).with_skip_validation(true) };
assert_eq!(decoder.num_batches(), 10);

for i in 0..decoder.num_batches() {
Expand Down Expand Up @@ -248,7 +247,7 @@ impl IPCBufferDecoder {
self.batches.len()
}

fn get_batch(&self, i: usize) -> RecordBatch {
fn get_batch(&mut self, i: usize) -> RecordBatch {
let block = &self.batches[i];
let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
let data = self
Expand Down
113 changes: 96 additions & 17 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -781,24 +781,13 @@ fn read_dictionary_impl(
Ok(())
}

/// Updates the `dictionaries_by_id` with the provided dictionary values and id.
///
/// # Errors
/// - If `is_delta` is true and there is no existing dictionary for the given
/// `dict_id`
/// - If `is_delta` is true and the concatenation of the existing and new
/// dictionary fails. This usually signals a type mismatch between the old and
/// new values.
fn update_dictionaries(
dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
is_delta: bool,
dict_id: i64,
dict_values: ArrayRef,
) -> Result<(), ArrowError> {
if !is_delta {
// We don't currently record the isOrdered field. This could be general
// attributes of arrays.
// Add (possibly multiple) array refs to the dictionaries array.
dictionaries_by_id.insert(dict_id, dict_values.clone());
return Ok(());
}
Expand All @@ -818,6 +807,70 @@ fn update_dictionaries(
Ok(())
}

/// Internal lazy update helper used by StreamReader / FileDecoder / StreamDecoder.
///
/// Keeps the decode boundary as `HashMap<i64, ArrayRef>` and stores only pending
/// delta chunks separately until a record batch actually needs them.
fn update_dictionaries_lazy(
dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
pending_dictionary_deltas: &mut HashMap<i64, Vec<ArrayRef>>,
is_delta: bool,
dict_id: i64,
dict_values: ArrayRef,
) -> Result<(), ArrowError> {
if !is_delta {
dictionaries_by_id.insert(dict_id, dict_values);
pending_dictionary_deltas.remove(&dict_id);
return Ok(());
}

if !dictionaries_by_id.contains_key(&dict_id) {
return Err(ArrowError::InvalidArgumentError(format!(
"No existing dictionary for delta dictionary with id '{dict_id}'"
)));
}

pending_dictionary_deltas
.entry(dict_id)
.or_default()
.push(dict_values);

Ok(())
}

fn materialize_dictionary_deltas(
dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
pending_dictionary_deltas: &mut HashMap<i64, Vec<ArrayRef>>,
) -> Result<(), ArrowError> {
if pending_dictionary_deltas.is_empty() {
return Ok(());
}

let pending = std::mem::take(pending_dictionary_deltas);

for (dict_id, deltas) in pending {
let existing = dictionaries_by_id.get(&dict_id).ok_or_else(|| {
ArrowError::InvalidArgumentError(format!(
"No existing dictionary for delta dictionary with id '{dict_id}'"
))
})?;

let mut arrays: Vec<&dyn Array> = Vec::with_capacity(1 + deltas.len());
arrays.push(existing.as_ref());
for delta in &deltas {
arrays.push(delta.as_ref());
}

let combined = concat::concat(&arrays).map_err(|e| {
ArrowError::InvalidArgumentError(format!("Failed to concat delta dictionary: {e}"))
})?;

dictionaries_by_id.insert(dict_id, combined);
}

Ok(())
}

/// Given a dictionary batch IPC message/body along with the full state of a
/// stream including schema, dictionary cache, metadata, and other flags, this
/// function will parse the buffer into an array of dictionary values.
Expand Down Expand Up @@ -977,6 +1030,7 @@ pub fn read_footer_length(buf: [u8; 10]) -> Result<usize, ArrowError> {
pub struct FileDecoder {
schema: SchemaRef,
dictionaries: HashMap<i64, ArrayRef>,
pending_dictionary_deltas: HashMap<i64, Vec<ArrayRef>>,
version: MetadataVersion,
projection: Option<Vec<usize>>,
require_alignment: bool,
Expand All @@ -990,6 +1044,7 @@ impl FileDecoder {
schema,
version,
dictionaries: Default::default(),
pending_dictionary_deltas: Default::default(),
projection: None,
require_alignment: false,
skip_validation: UnsafeFlag::new(),
Expand Down Expand Up @@ -1052,14 +1107,23 @@ impl FileDecoder {
match message.header_type() {
crate::MessageHeader::DictionaryBatch => {
let batch = message.header_as_dictionary_batch().unwrap();
read_dictionary_impl(

let dict_values = get_dictionary_values(
&buf.slice(block.metaDataLength() as _),
batch,
&self.schema,
&mut self.dictionaries,
&message.version(),
self.require_alignment,
self.skip_validation.clone(),
)?;

update_dictionaries_lazy(
&mut self.dictionaries,
&mut self.pending_dictionary_deltas,
batch.isDelta(),
batch.id(),
dict_values,
)
}
t => Err(ArrowError::ParseError(format!(
Expand All @@ -1070,7 +1134,7 @@ impl FileDecoder {

/// Read the RecordBatch with the given block and data buffer
pub fn read_record_batch(
&self,
&mut self,
block: &Block,
buf: &Buffer,
) -> Result<Option<RecordBatch>, ArrowError> {
Expand All @@ -1080,10 +1144,14 @@ impl FileDecoder {
"Not expecting a schema when messages are read".to_string(),
)),
crate::MessageHeader::RecordBatch => {
materialize_dictionary_deltas(
&mut self.dictionaries,
&mut self.pending_dictionary_deltas,
)?;

let batch = message.header_as_record_batch().ok_or_else(|| {
ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
})?;
// read the block that makes up the record batch into a buffer
RecordBatchDecoder::try_new(
&buf.slice(block.metaDataLength() as _),
batch,
Expand Down Expand Up @@ -1480,6 +1548,8 @@ pub struct StreamReader<R> {
/// Dictionaries may be appended to in the streaming format.
dictionaries_by_id: HashMap<i64, ArrayRef>,

pending_dictionary_deltas: HashMap<i64, Vec<ArrayRef>>,

/// An indicator of whether the stream is complete.
///
/// This value is set to `true` the first time the reader's `next()` returns `None`.
Expand Down Expand Up @@ -1553,6 +1623,7 @@ impl<R: Read> StreamReader<R> {

// Create an array of optional dictionary value arrays, one per field.
let dictionaries_by_id = HashMap::new();
let pending_dictionary_deltas = HashMap::new();

let projection = match projection {
Some(projection_indices) => {
Expand All @@ -1567,6 +1638,7 @@ impl<R: Read> StreamReader<R> {
schema: Arc::new(schema),
finished: false,
dictionaries_by_id,
pending_dictionary_deltas,
projection,
skip_validation: UnsafeFlag::new(),
})
Expand Down Expand Up @@ -1644,6 +1716,11 @@ impl<R: Read> StreamReader<R> {
IpcMessage::Schema(arrow_schema)
}
Message::MessageHeader::RecordBatch => {
materialize_dictionary_deltas(
&mut self.dictionaries_by_id,
&mut self.pending_dictionary_deltas,
)?;

let batch = message.header_as_record_batch().ok_or_else(|| {
ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
})?;
Expand Down Expand Up @@ -1681,19 +1758,21 @@ impl<R: Read> StreamReader<R> {
self.skip_validation.clone(),
)?;

update_dictionaries(
update_dictionaries_lazy(
&mut self.dictionaries_by_id,
&mut self.pending_dictionary_deltas,
dict.isDelta(),
dict.id(),
dict_values.clone(),
)?;

IpcMessage::DictionaryBatch {
id: dict.id(),
is_delta: (dict.isDelta()),
values: (dict_values),
is_delta: dict.isDelta(),
values: dict_values,
}
}

x => {
return Err(ArrowError::ParseError(format!(
"Unsupported message header type in IPC stream: '{x:?}'"
Expand Down
24 changes: 22 additions & 2 deletions arrow-ipc/src/reader/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ use arrow_data::UnsafeFlag;
use arrow_schema::{ArrowError, SchemaRef};

use crate::convert::MessageBuffer;
use crate::reader::{RecordBatchDecoder, read_dictionary_impl};
use crate::reader::{
RecordBatchDecoder, get_dictionary_values, materialize_dictionary_deltas,
update_dictionaries_lazy,
};
use crate::{CONTINUATION_MARKER, MessageHeader};

/// A low-level interface for reading [`RecordBatch`] data from a stream of bytes
Expand All @@ -37,6 +40,7 @@ pub struct StreamDecoder {
schema: Option<SchemaRef>,
/// Lookup table for dictionaries by ID
dictionaries: HashMap<i64, ArrayRef>,
pending_dictionary_deltas: HashMap<i64, Vec<ArrayRef>>,
/// The decoder state
state: DecoderState,
/// A scratch buffer when a read is split across multiple `Buffer`
Expand Down Expand Up @@ -222,6 +226,11 @@ impl StreamDecoder {
self.schema = Some(Arc::new(schema));
}
MessageHeader::RecordBatch => {
materialize_dictionary_deltas(
&mut self.dictionaries,
&mut self.pending_dictionary_deltas,
)?;

let batch = message.header_as_record_batch().unwrap();
let schema = self.schema.clone().ok_or_else(|| {
ArrowError::IpcError("Missing schema".to_string())
Expand All @@ -234,6 +243,7 @@ impl StreamDecoder {
&version,
)?
.with_require_alignment(self.require_alignment)
.with_skip_validation(self.skip_validation.clone())
.read_record_batch()?;
self.state = DecoderState::default();
return Ok(Some(batch));
Expand All @@ -243,7 +253,8 @@ impl StreamDecoder {
let schema = self.schema.as_deref().ok_or_else(|| {
ArrowError::IpcError("Missing schema".to_string())
})?;
read_dictionary_impl(

let dictionary_values = get_dictionary_values(
&body,
dictionary,
schema,
Expand All @@ -252,6 +263,15 @@ impl StreamDecoder {
self.require_alignment,
self.skip_validation.clone(),
)?;

update_dictionaries_lazy(
&mut self.dictionaries,
&mut self.pending_dictionary_deltas,
dictionary.isDelta(),
dictionary.id(),
dictionary_values,
)?;

self.state = DecoderState::default();
}
MessageHeader::NONE => {
Expand Down
55 changes: 55 additions & 0 deletions arrow-ipc/src/tests/delta_dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,61 @@ fn test_multi_same_value_sequence() {
);
}

#[test]
fn test_multiple_deltas_before_record_batch() {
let schema = Arc::new(Schema::new(vec![Field::new_dictionary(
"dict_col",
DataType::Int32,
DataType::Utf8,
false,
)]));

let mut builder = StringDictionaryBuilder::<Int32Type>::new();

let batch1 = build_batch(&["A"], &mut builder);
let batch2 = build_batch(&["A", "B"], &mut builder);
let batch3 = build_batch(&["A", "B", "C"], &mut builder);

let mut buf = Vec::new();
let opts = IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);
let mut writer = StreamWriter::try_new_with_options(&mut buf, &schema, opts).unwrap();
writer.write(&batch1).unwrap();
writer.write(&batch2).unwrap();
writer.write(&batch3).unwrap();
writer.finish().unwrap();

let mut reader = StreamReader::try_new(Cursor::new(buf), None).unwrap();

let mut seen = Vec::new();
while let Some(message) = reader.next_ipc_message().unwrap() {
seen.push(message);
}

// Existing tests already validate message semantics generally.
// This specifically guards the "delta burst before use" case.
let batches: Vec<_> = seen
.into_iter()
.filter_map(|m| match m {
IpcMessage::RecordBatch(b) => Some(b),
_ => None,
})
.collect();

assert_eq!(batches.len(), 3);
assert_eq!(
dict_to_vec(extract_dictionary(batches[0].clone())),
vec!["A"]
);
assert_eq!(
dict_to_vec(extract_dictionary(batches[1].clone())),
vec!["A", "B"]
);
assert_eq!(
dict_to_vec(extract_dictionary(batches[2].clone())),
vec!["A", "B", "C"]
);
}

#[derive(Debug, PartialEq)]
enum MessageType {
Schema,
Expand Down
Loading