Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
send a separate descriptor message when the descriptor is provided
  • Loading branch information
Weijun-H committed Apr 21, 2023
commit c48c00f42777d2e40e78e0ae35e1d1b474d3b5fe
37 changes: 27 additions & 10 deletions arrow-flight/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use std::{collections::VecDeque, fmt::Debug, pin::Pin, sync::Arc, task::Poll};

use crate::{error::Result, FlightData, FlightDescriptor, SchemaAsIpc};
use crate::{error::Result, FlightData, FlightDescriptor, IpcMessage, SchemaAsIpc};
use arrow_array::{ArrayRef, RecordBatch, RecordBatchOptions};
use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
Expand Down Expand Up @@ -173,7 +173,6 @@ impl FlightDataEncoderBuilder {
/// Stream that encodes a stream of record batches to flight data.
///
/// See [`FlightDataEncoderBuilder`] for details and example.
#[warn(dead_code)]
pub struct FlightDataEncoder {
/// Input stream
inner: BoxStream<'static, Result<RecordBatch>>,
Expand Down Expand Up @@ -211,13 +210,19 @@ impl FlightDataEncoder {
app_metadata: Some(app_metadata),
queue: VecDeque::new(),
done: false,
descriptor,
descriptor: None,
};

// If schema is known up front, enqueue it immediately
if let Some(schema) = schema {
encoder.encode_schema(&schema);
}

// If descriptor is known up front, enqueue it immediately
if let Some(descriptor) = descriptor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand the intent, the idea is that the descriptor is set on the first message that is sent this code will potentially send a separate message after the schema message, if the schema is known up front.

Perhaps you could check self.descriptor in enqueue_messages and attach the descriptor there

encoder.encode_descriptor(&descriptor);
}

encoder
}

Expand All @@ -228,13 +233,7 @@ impl FlightDataEncoder {

/// Place the `FlightData` in the queue to send
fn queue_messages(&mut self, datas: impl IntoIterator<Item = FlightData>) {
let mut is_first = true;
for mut data in datas {
// The first message is the schema message need to set the descriptor
if is_first {
data.flight_descriptor = self.descriptor.clone();
is_first = !is_first;
}
for data in datas {
self.queue_message(data)
}
}
Expand All @@ -257,6 +256,24 @@ impl FlightDataEncoder {
schema
}

/// Encodes descriptor as a [`FlightData`] in self.queue.
/// Updates `self.descriptor` and returns the new descriptor
fn encode_descriptor(&mut self, descriptor: &FlightDescriptor) -> FlightDescriptor {
// The first message is the descriptor message, and all
// batches have the same descriptor
let descriptor = descriptor.clone();
let descriptor_flight_data = FlightData::new(
Some(descriptor.clone()),
IpcMessage(Bytes::new()),
vec![],
vec![],
);
self.queue_message(descriptor_flight_data);
// remember descriptor
self.descriptor = Some(descriptor.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the descriptor remembered?

descriptor
}

/// Encodes batch into one or more `FlightData` messages in self.queue
fn encode_batch(&mut self, batch: RecordBatch) -> Result<()> {
let schema = match &self.schema {
Expand Down