diff --git a/Cargo.toml b/Cargo.toml index 2c19d422d1861..b1f07aa531df3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -154,7 +154,6 @@ large_futures = "warn" [workspace.lints.rust] unused_imports = "deny" - ## Temporary arrow-rs patch until 52.2.0 is released [patch.crates-io] diff --git a/benchmarks/src/clickbench.rs b/benchmarks/src/clickbench.rs index 41dffc55f371e..34004d9fb1c80 100644 --- a/benchmarks/src/clickbench.rs +++ b/benchmarks/src/clickbench.rs @@ -116,7 +116,9 @@ impl RunOpt { None => queries.min_query_id()..=queries.max_query_id(), }; - let config = self.common.config(); + let mut config = self.common.config(); + config.options_mut().execution.schema_force_string_view = self.common.string_view; + let ctx = SessionContext::new_with_config(config); self.register_hits(&ctx).await?; diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index f2a93d2ea5495..3a31f48a4bd43 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -120,6 +120,7 @@ impl RunOpt { .config() .with_collect_statistics(!self.disable_statistics); config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join; + config.options_mut().execution.schema_force_string_view = self.common.string_view; let ctx = SessionContext::new_with_config(config); // register tables @@ -339,6 +340,7 @@ mod tests { partitions: Some(2), batch_size: 8192, debug: false, + string_view: false, }; let opt = RunOpt { query: Some(query), @@ -372,6 +374,7 @@ mod tests { partitions: Some(2), batch_size: 8192, debug: false, + string_view: false, }; let opt = RunOpt { query: Some(query), diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index b9398e5b522f2..02591e293272e 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -37,6 +37,11 @@ pub struct CommonOpt { /// Activate debug mode to see more details #[structopt(short, long)] pub debug: bool, + + /// If true, will use StringView/BinaryViewArray instead of String/BinaryArray + /// when reading ParquetFiles + #[structopt(long)] + pub string_view: bool, } impl CommonOpt { diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 880f0119ce0da..5f5b441bd6066 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -311,6 +311,10 @@ config_namespace! { /// Should DataFusion keep the columns used for partition_by in the output RecordBatches pub keep_partition_by_columns: bool, default = false + + /// If true, listing tables will read columns of `Utf8/Utf8Large` with `Utf8View`, + /// and `Binary/BinaryLarge` with `BinaryView`. + pub schema_force_string_view: bool, default = false } } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index ea4d396a14cb3..0142d42767721 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -410,7 +410,32 @@ impl ListingOptions { .try_collect() .await?; - self.format.infer_schema(state, &store, &files).await + let mut schema = self.format.infer_schema(state, &store, &files).await?; + + if state.config_options().execution.schema_force_string_view { + let transformed_fields: Vec> = schema + .fields + .iter() + .map(|field| match field.data_type() { + DataType::Utf8 | DataType::LargeUtf8 => Arc::new(Field::new( + field.name(), + DataType::Utf8View, + field.is_nullable(), + )), + DataType::Binary | DataType::LargeBinary => Arc::new(Field::new( + field.name(), + DataType::BinaryView, + field.is_nullable(), + )), + _ => field.clone(), + }) + .collect(); + schema = Arc::new(Schema::new_with_metadata( + transformed_fields, + schema.metadata.clone(), + )); + } + Ok(schema) } /// Infers the partition columns stored in `LOCATION` and compares diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 95bea1223a9ce..9be3e9fd4146d 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -205,6 +205,7 @@ datafusion.execution.parquet.statistics_enabled NULL datafusion.execution.parquet.write_batch_size 1024 datafusion.execution.parquet.writer_version 1.0 datafusion.execution.planning_concurrency 13 +datafusion.execution.schema_force_string_view false datafusion.execution.soft_max_rows_per_output_file 50000000 datafusion.execution.sort_in_place_threshold_bytes 1048576 datafusion.execution.sort_spill_reservation_bytes 10485760 @@ -289,6 +290,7 @@ datafusion.execution.parquet.statistics_enabled NULL Sets if statistics are enab datafusion.execution.parquet.write_batch_size 1024 Sets write_batch_size in bytes datafusion.execution.parquet.writer_version 1.0 Sets parquet writer version valid values are "1.0" and "2.0" datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system +datafusion.execution.schema_force_string_view false If true, listing tables will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 5130b0a56d0e9..106f4369f970e 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -87,6 +87,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.enable_recursive_ctes | true | Should DataFusion support recursive CTEs | | datafusion.execution.split_file_groups_by_statistics | false | Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental | | datafusion.execution.keep_partition_by_columns | false | Should DataFusion keep the columns used for partition_by in the output RecordBatches | +| datafusion.execution.schema_force_string_view | false | If true, listing tables will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |