Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
feat: extend arrow reader instrumentation to capture row filtering st…
…ats in the trace
  • Loading branch information
sdd committed Jul 19, 2025
commit 50f607dc970395d64ff2dbcb2416500a3142cdfd
68 changes: 68 additions & 0 deletions crates/iceberg/src/arrow/count_recording_record_batch_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::task::{Context, Poll};

use arrow_array::RecordBatch;
use futures::Stream;

use crate::Result;

pub(crate) struct CountRecordingRecordBatchStream<S> {
stream: S,
row_count: AtomicU64,
record_batch_count: AtomicU64,
target_span: tracing::Span,
record_batch_count_field_name: &'static str,
row_count_field_name: &'static str,
}

impl<S> CountRecordingRecordBatchStream<S> {
pub(crate) fn new(
stream: S,
target_span: tracing::Span,
record_batch_count_field_name: &'static str,
row_count_field_name: &'static str,
) -> Self {
Self {
stream,
row_count: AtomicU64::new(0),
record_batch_count: AtomicU64::new(0),
target_span,
record_batch_count_field_name,
row_count_field_name,
}
}
}

impl<S> Stream for CountRecordingRecordBatchStream<S>
where S: Stream<Item = Result<RecordBatch>> + Unpin
{
type Item = S::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

match Pin::new(&mut this.stream).poll_next(cx) {
Poll::Ready(Some(Ok(batch))) => {
let row_count = batch.num_rows() as u64;

this.row_count.fetch_add(row_count, Ordering::Relaxed);
this.record_batch_count.fetch_add(1, Ordering::Relaxed);

Poll::Ready(Some(Ok(batch)))
}
other => other,
}
}
}

impl<S> Drop for CountRecordingRecordBatchStream<S> {
fn drop(&mut self) {
let total_record_batches = self.record_batch_count.load(Ordering::Relaxed);
let total_rows = self.row_count.load(Ordering::Relaxed);
self.target_span
.record(self.record_batch_count_field_name, total_record_batches);
self.target_span
.record(self.row_count_field_name, total_rows);
}
}
1 change: 1 addition & 0 deletions crates/iceberg/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub(crate) mod caching_delete_file_loader;
pub mod delete_file_loader;
pub(crate) mod delete_filter;

pub(crate) mod count_recording_record_batch_stream;
mod reader;
pub(crate) mod record_batch_projector;
pub(crate) mod record_batch_transformer;
Expand Down
78 changes: 77 additions & 1 deletion crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
use tracing::Instrument;

use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
use crate::arrow::count_recording_record_batch_stream::CountRecordingRecordBatchStream;
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
use crate::delete_vector::DeleteVector;
Expand Down Expand Up @@ -179,7 +180,17 @@ impl ArrowReader {
#[tracing::instrument(
skip_all,
level = "debug",
name = "iceberg.scan.execute.process_data_file"
name = "iceberg.scan.execute.process_data_file",
fields(
iceberg.scan.execute.data_file.data_file_path = %task.data_file_path,
iceberg.scan.execute.data_file.row_groups.skipped_count,
iceberg.scan.execute.data_file.row_groups.selected_count,
iceberg.scan.execute.data_file.row_selection.skipped_count,
iceberg.scan.execute.data_file.row_selection.selected_count,
iceberg.scan.execute.data_file.rows.scanned_count,
iceberg.scan.execute.data_file.rows.selected_count,
iceberg.scan.execute.data_file.record_batches.count,
)
)]
async fn process_file_scan_task(
task: FileScanTask,
Expand Down Expand Up @@ -255,6 +266,8 @@ impl ArrowReader {
// by using a `RowSelection`.
let mut selected_row_group_indices = None;
let mut row_selection = None;
let mut selected_row_count: Option<u64> = task.record_count;
let mut skipped_row_count: u64 = 0;

if let Some(predicate) = final_predicate {
let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
Expand All @@ -278,6 +291,43 @@ impl ArrowReader {
&task.schema,
)?;

tracing::Span::current().record(
"iceberg.scan.execute.data_file.row_groups.selected_count",
result.len(),
);
tracing::Span::current().record(
"iceberg.scan.execute.data_file.row_groups.skipped_count",
record_batch_stream_builder.metadata().row_groups().len() - result.len(),
);
selected_row_count = Some(
result
.iter()
.map(|&row_group_index| {
record_batch_stream_builder
.metadata()
.row_group(row_group_index)
.num_rows() as u64
})
.sum::<u64>(),
);
tracing::Span::current().record(
"iceberg.scan.execute.data_file.row_groups.selected_row_count",
selected_row_count.unwrap(),
);
skipped_row_count += (0..record_batch_stream_builder.metadata().row_groups().len())
.filter(|&i| !result.contains(&i))
.map(|i| {
record_batch_stream_builder
.metadata()
.row_group(i)
.num_rows() as u64
})
.sum::<u64>();
tracing::Span::current().record(
"iceberg.scan.execute.data_file.row_groups.skipped_row_count",
skipped_row_count,
);

selected_row_group_indices = Some(result);
}

Expand Down Expand Up @@ -316,6 +366,20 @@ impl ArrowReader {
}

if let Some(row_selection) = row_selection {
let new_selected_row_count = row_selection.row_count() as u64;
tracing::Span::current().record(
"iceberg.scan.execute.data_file.row_selection.selected_count",
new_selected_row_count,
);

if let Some(selected_row_count) = selected_row_count {
tracing::Span::current().record(
"iceberg.scan.execute.data_file.row_selection.skipped_count",
selected_row_count - new_selected_row_count,
);
}
selected_row_count = Some(new_selected_row_count);

record_batch_stream_builder =
record_batch_stream_builder.with_row_selection(row_selection);
}
Expand All @@ -325,6 +389,11 @@ impl ArrowReader {
record_batch_stream_builder.with_row_groups(selected_row_group_indices);
}

tracing::Span::current().record(
"iceberg.scan.execute.data_file.rows.scanned_count",
selected_row_count,
);

let stream_span = tracing::trace_span!("iceberg.scan.execute.process_data_file_stream");
let stream_span_clone = stream_span.clone();
let _guard = stream_span.enter();
Expand All @@ -339,6 +408,13 @@ impl ArrowReader {
});
drop(_guard);

let stream = Box::pin(CountRecordingRecordBatchStream::new(
stream,
span.clone(),
"iceberg.scan.execute.data_file.record_batches.count",
"iceberg.scan.execute.data_file.rows.selected_count",
));

Ok(Box::pin(TracedStream::new(stream, vec![
parent_span,
span,
Expand Down
4 changes: 4 additions & 0 deletions crates/integration_tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ pub fn set_test_fixture(func: &str, set_up_tracing_subscriber: bool) -> TestFixt
docker_compose.down();
docker_compose.up();
} else {
// check if the containers are running and start them if not
if !docker_compose.is_running() {
docker_compose.up();
}
docker_compose.keep_running();
}

Expand Down
82 changes: 77 additions & 5 deletions crates/integration_tests/testdata/spark/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@
spark = (
SparkSession
.builder
.appName("IcebergDemo")
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,org.apache.iceberg:iceberg-aws-bundle:1.5.0")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
# .config("spark.sql.catalog.rest", "org.apache.iceberg.spark.SparkCatalog")
# .config("spark.sql.catalog.rest.type", "rest")
# .config("spark.sql.catalog.rest.uri", "http://rest:8181")
# .config("spark.sql.catalog.rest.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
# .config("spark.sql.catalog.rest.warehouse", "s3://icebergdata/demo")
# .config("spark.sql.catalog.rest.s3.endpoint", "http://minio:9000")
# .config("spark.sql.defaultCatalog", "rest")
.config("spark.sql.shuffle.partitions", "200")
.config("spark.default.parallelism", "200")
.config("spark.sql.adaptive.enabled", "true")
Expand Down Expand Up @@ -196,16 +206,19 @@
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read',
'format-version'='2'
'format-version'='2',
'write.parquet.row-group-size-bytes'='33554432',
'write.target-file-size-bytes'='134217728',
'write.parquet.page-row-limit'='20000'
)
"""
)

print("Table created. Generating fake taxi trip data...")

# Generate data in batches to avoid memory issues
BATCH_SIZE = 100000
TOTAL_ROWS = 1000000
BATCH_SIZE = 100_000
TOTAL_ROWS = 10_000_000
NUM_BATCHES = TOTAL_ROWS // BATCH_SIZE

# NYC coordinates boundaries (approximate)
Expand Down Expand Up @@ -379,7 +392,66 @@ def generate_nyc_coordinates():
rate = rows_processed / elapsed_time if elapsed_time > 0 else 0
print(f"Inserted {rows_processed:,} rows. Rate: {rate:,.0f} rows/sec")

print(f"\nCompleted! Generated {TOTAL_ROWS:,} taxi trip records.")
print(f"\nCompleted! Generated {TOTAL_ROWS:,} taxi trip records. Rewriting data files to apply sort")

print("Checking available system procedures...")

try:
# Check if we can see system procedures
result = spark.sql("SHOW PROCEDURES").collect()
print("Available procedures:")
for row in result:
print(f" {row}")
except Exception as e:
print(f"Could not show procedures: {e}")

try:
# Try to show procedures from the rest catalog specifically
result = spark.sql("SHOW PROCEDURES rest.system").collect()
print("Rest catalog procedures:")
for row in result:
print(f" {row}")
except Exception as e:
print(f"Could not show rest.system procedures: {e}")

# Enable more verbose logging to see what's happening
spark.sparkContext.setLogLevel("DEBUG")

print("Attempting rewrite with more verbose error handling...")

try:
result = spark.sql("""
CALL rest.system.rewrite_data_files(
'default.nyc_taxi_trips',
'sort',
'zorder(pickup_latitude, pickup_longitude)'
)
""")
print("Rewrite successful!")
result.show()
except Exception as e:
print(f"Detailed error: {e}")
import traceback
traceback.print_exc()

# Try alternative syntax variations
alternative_calls = [
"CALL system.rewrite_data_files('default.nyc_taxi_trips')",
"CALL rest.system.rewrite_data_files('default.nyc_taxi_trips')",
"CALL rest.system.rewrite_data_files(table => 'default.nyc_taxi_trips')",
]

for call_sql in alternative_calls:
try:
print(f"Trying: {call_sql}")
result = spark.sql(call_sql)
print("Success!")
result.show()
break
except Exception as e:
print(f"Failed: {e}")

print(f"Total time: {time.time() - start_time:.2f} seconds")

spark.stop()

spark.stop()
13 changes: 10 additions & 3 deletions crates/integration_tests/tests/shared_tests/observability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,16 @@ async fn test_observability() -> Result<(), IcebergError> {
.load_table(&TableIdent::from_strs(["default", "nyc_taxi_trips"]).unwrap())
.await?;

let predicate = Reference::new("vendor_id").equal_to(Datum::long(1));

let scan = table.scan().with_filter(predicate).build()?;
let predicate = Reference::new("vendor_id")
.equal_to(Datum::long(1))
.and(Reference::new("pickup_longitude").greater_than(Datum::double(-74.0)))
.and(Reference::new("pickup_latitude").less_than(Datum::double(40.7)));

let scan = table
.scan()
.with_filter(predicate)
.with_row_selection_enabled(true)
.build()?;

let results = scan.to_arrow().await?.try_collect::<Vec<_>>().await?;
assert!(!results.is_empty());
Expand Down