Skip to content

Commit 0059049

Browse files
authored
feat: set FlightDescriptor on FlightDataEncoderBuilder (#4101)
* feat: set FlightDescriptor on FlightDataEncoderBuilder * send a separate descriptor message when the descriptor is provided * include the flight descriptor in the first FlightData
1 parent f3b4a73 commit 0059049

File tree

2 files changed

+49
-2
lines changed

2 files changed

+49
-2
lines changed

arrow-flight/src/encode.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
use std::{collections::VecDeque, fmt::Debug, pin::Pin, sync::Arc, task::Poll};
1919

20-
use crate::{error::Result, FlightData, SchemaAsIpc};
20+
use crate::{error::Result, FlightData, FlightDescriptor, SchemaAsIpc};
2121
use arrow_array::{ArrayRef, RecordBatch, RecordBatchOptions};
2222
use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
2323
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
@@ -72,6 +72,8 @@ pub struct FlightDataEncoderBuilder {
7272
app_metadata: Bytes,
7373
/// Optional schema, if known before data.
7474
schema: Option<SchemaRef>,
75+
/// Optional flight descriptor, if known before data.
76+
descriptor: Option<FlightDescriptor>,
7577
}
7678

7779
/// Default target size for encoded [`FlightData`].
@@ -87,6 +89,7 @@ impl Default for FlightDataEncoderBuilder {
8789
options: IpcWriteOptions::default(),
8890
app_metadata: Bytes::new(),
8991
schema: None,
92+
descriptor: None,
9093
}
9194
}
9295
}
@@ -134,6 +137,15 @@ impl FlightDataEncoderBuilder {
134137
self
135138
}
136139

140+
/// Specify a flight descriptor in the first FlightData message.
141+
pub fn with_flight_descriptor(
142+
mut self,
143+
descriptor: Option<FlightDescriptor>,
144+
) -> Self {
145+
self.descriptor = descriptor;
146+
self
147+
}
148+
137149
/// Return a [`Stream`](futures::Stream) of [`FlightData`],
138150
/// consuming self. More details on [`FlightDataEncoder`]
139151
pub fn build<S>(self, input: S) -> FlightDataEncoder
@@ -145,6 +157,7 @@ impl FlightDataEncoderBuilder {
145157
options,
146158
app_metadata,
147159
schema,
160+
descriptor,
148161
} = self;
149162

150163
FlightDataEncoder::new(
@@ -153,6 +166,7 @@ impl FlightDataEncoderBuilder {
153166
max_flight_data_size,
154167
options,
155168
app_metadata,
169+
descriptor,
156170
)
157171
}
158172
}
@@ -176,6 +190,8 @@ pub struct FlightDataEncoder {
176190
queue: VecDeque<FlightData>,
177191
/// Is this stream done (inner is empty or errored)
178192
done: bool,
193+
/// cleared after the first FlightData message is sent
194+
descriptor: Option<FlightDescriptor>,
179195
}
180196

181197
impl FlightDataEncoder {
@@ -185,6 +201,7 @@ impl FlightDataEncoder {
185201
max_flight_data_size: usize,
186202
options: IpcWriteOptions,
187203
app_metadata: Bytes,
204+
descriptor: Option<FlightDescriptor>,
188205
) -> Self {
189206
let mut encoder = Self {
190207
inner,
@@ -194,17 +211,22 @@ impl FlightDataEncoder {
194211
app_metadata: Some(app_metadata),
195212
queue: VecDeque::new(),
196213
done: false,
214+
descriptor,
197215
};
198216

199217
// If schema is known up front, enqueue it immediately
200218
if let Some(schema) = schema {
201219
encoder.encode_schema(&schema);
202220
}
221+
203222
encoder
204223
}
205224

206225
/// Place the `FlightData` in the queue to send
207-
fn queue_message(&mut self, data: FlightData) {
226+
fn queue_message(&mut self, mut data: FlightData) {
227+
if let Some(descriptor) = self.descriptor.take() {
228+
data.flight_descriptor = Some(descriptor);
229+
}
208230
self.queue.push_back(data);
209231
}
210232

arrow-flight/tests/encode_decode.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use std::{collections::HashMap, sync::Arc};
2222
use arrow_array::types::Int32Type;
2323
use arrow_array::{ArrayRef, DictionaryArray, Float64Array, RecordBatch, UInt8Array};
2424
use arrow_cast::pretty::pretty_format_batches;
25+
use arrow_flight::flight_descriptor::DescriptorType;
26+
use arrow_flight::FlightDescriptor;
2527
use arrow_flight::{
2628
decode::{DecodedPayload, FlightDataDecoder, FlightRecordBatchStream},
2729
encode::FlightDataEncoderBuilder,
@@ -136,6 +138,29 @@ async fn test_zero_batches_schema_specified() {
136138
assert_eq!(decoder.schema(), Some(&schema));
137139
}
138140

141+
#[tokio::test]
142+
async fn test_with_flight_descriptor() {
143+
let stream = futures::stream::iter(vec![Ok(make_dictionary_batch(5))]);
144+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
145+
146+
let descriptor = Some(FlightDescriptor {
147+
r#type: DescriptorType::Path.into(),
148+
path: vec!["table_name".to_string()],
149+
cmd: Bytes::default(),
150+
});
151+
152+
let encoder = FlightDataEncoderBuilder::default()
153+
.with_schema(schema.clone())
154+
.with_flight_descriptor(descriptor.clone());
155+
156+
let mut encoder = encoder.build(stream);
157+
158+
// First batch should be the schema
159+
let first_batch = encoder.next().await.unwrap().unwrap();
160+
161+
assert_eq!(first_batch.flight_descriptor, descriptor);
162+
}
163+
139164
#[tokio::test]
140165
async fn test_zero_batches_dictionary_schema_specified() {
141166
let schema = Arc::new(Schema::new(vec![

0 commit comments

Comments
 (0)