diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 7eef17b4e3362..6f5393361bf15 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -701,12 +701,14 @@ name = "datafusion-cli" version = "15.0.0" dependencies = [ "arrow", + "async-trait", "clap", "datafusion", "dirs", "env_logger", "mimalloc", "object_store", + "parking_lot", "rustyline", "tokio", "url", diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 2f61d38839bf4..07711eaf1eb7a 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -30,12 +30,14 @@ readme = "README.md" [dependencies] arrow = "29.0.0" +async-trait = "0.1.41" clap = { version = "3", features = ["derive", "cargo"] } datafusion = { path = "../datafusion/core", version = "15.0.0" } dirs = "4.0.0" env_logger = "0.9" mimalloc = { version = "0.1", default-features = false } object_store = { version = "0.5.0", features = ["aws", "gcp"] } +parking_lot = { version = "0.12" } rustyline = "10.0" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] } url = "2.2" diff --git a/datafusion-cli/src/catalog.rs b/datafusion-cli/src/catalog.rs new file mode 100644 index 0000000000000..dbd6751a4f769 --- /dev/null +++ b/datafusion-cli/src/catalog.rs @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use datafusion::catalog::catalog::{CatalogList, CatalogProvider}; +use datafusion::catalog::schema::SchemaProvider; +use datafusion::datasource::listing::{ + ListingTable, ListingTableConfig, ListingTableUrl, +}; +use datafusion::datasource::TableProvider; +use datafusion::error::Result; +use datafusion::execution::context::SessionState; +use parking_lot::RwLock; +use std::any::Any; +use std::sync::{Arc, Weak}; + +/// Wraps another catalog, automatically creating table providers +/// for local files if needed +pub struct DynamicFileCatalog { + inner: Arc, + state: Weak>, +} + +impl DynamicFileCatalog { + pub fn new(inner: Arc, state: Weak>) -> Self { + Self { inner, state } + } +} + +impl CatalogList for DynamicFileCatalog { + fn as_any(&self) -> &dyn Any { + self + } + + fn register_catalog( + &self, + name: String, + catalog: Arc, + ) -> Option> { + self.inner.register_catalog(name, catalog) + } + + fn catalog_names(&self) -> Vec { + self.inner.catalog_names() + } + + fn catalog(&self, name: &str) -> Option> { + let state = self.state.clone(); + self.inner + .catalog(name) + .map(|catalog| Arc::new(DynamicFileCatalogProvider::new(catalog, state)) as _) + } +} + +/// Wraps another catalog provider +struct DynamicFileCatalogProvider { + inner: Arc, + state: Weak>, +} + +impl DynamicFileCatalogProvider { + pub fn new( + inner: Arc, + state: Weak>, + ) -> Self { + Self { inner, state } + } +} + +impl CatalogProvider for DynamicFileCatalogProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + self.inner.schema_names() + } + + fn schema(&self, name: &str) -> Option> { + let state = self.state.clone(); + self.inner + .schema(name) + .map(|schema| Arc::new(DynamicFileSchemaProvider::new(schema, state)) as _) + } + + fn register_schema( + &self, + name: &str, + schema: Arc, + ) -> Result>> { + self.inner.register_schema(name, schema) + } +} + +/// Wraps another schema provider +struct DynamicFileSchemaProvider { + inner: Arc, + state: Weak>, +} + +impl DynamicFileSchemaProvider { + pub fn new( + inner: Arc, + state: Weak>, + ) -> Self { + Self { inner, state } + } +} + +#[async_trait] +impl SchemaProvider for DynamicFileSchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + self.inner.table_names() + } + + fn register_table( + &self, + name: String, + table: Arc, + ) -> Result>> { + self.inner.register_table(name, table) + } + + async fn table(&self, name: &str) -> Option> { + let inner_table = self.inner.table(name).await; + if inner_table.is_some() { + return inner_table; + } + + // if the inner schema provider didn't have a table by + // that name, try to treat it as a listing table + let state = self.state.upgrade()?.read().clone(); + let config = ListingTableConfig::new(ListingTableUrl::parse(name).ok()?) + .infer(&state) + .await + .ok()?; + Some(Arc::new(ListingTable::try_new(config).ok()?)) + } + + fn deregister_table(&self, name: &str) -> Result>> { + self.inner.deregister_table(name) + } + + fn table_exist(&self, name: &str) -> bool { + self.inner.table_exist(name) + } +} diff --git a/datafusion-cli/src/lib.rs b/datafusion-cli/src/lib.rs index 44d8f06107f94..7eb3cb51c1f88 100644 --- a/datafusion-cli/src/lib.rs +++ b/datafusion-cli/src/lib.rs @@ -18,6 +18,7 @@ #![doc = include_str!("../README.md")] pub const DATAFUSION_CLI_VERSION: &str = env!("CARGO_PKG_VERSION"); +pub mod catalog; pub mod command; pub mod exec; pub mod functions; diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index b9ada1a1f0fad..fa4adce14a41b 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -21,6 +21,7 @@ use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::SessionConfig; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::prelude::SessionContext; +use datafusion_cli::catalog::DynamicFileCatalog; use datafusion_cli::object_storage::DatafusionCliObjectStoreProvider; use datafusion_cli::{ exec, print_format::PrintFormat, print_options::PrintOptions, DATAFUSION_CLI_VERSION, @@ -106,6 +107,11 @@ pub async fn main() -> Result<()> { let mut ctx = SessionContext::with_config_rt(session_config.clone(), Arc::new(runtime_env)); ctx.refresh_catalogs().await?; + // install dynamic catalog provider that knows how to open files + ctx.register_catalog_list(Arc::new(DynamicFileCatalog::new( + ctx.state().catalog_list(), + ctx.state_weak_ref(), + ))); let mut print_options = PrintOptions { format: args.format, diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index ba8accdcee46e..9d652f8112eb1 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -30,7 +30,6 @@ use crate::{ pub use datafusion_physical_expr::execution_props::ExecutionProps; use datafusion_physical_expr::var_provider::is_system_variables; use parking_lot::RwLock; -use std::ops::ControlFlow; use std::sync::Arc; use std::{ any::{Any, TypeId}, @@ -41,6 +40,7 @@ use std::{ collections::{HashMap, HashSet}, fmt::Debug, }; +use std::{ops::ControlFlow, sync::Weak}; use arrow::datatypes::{DataType, SchemaRef}; use arrow::record_batch::RecordBatch; @@ -1009,6 +1009,16 @@ impl SessionContext { state.execution_props.start_execution(); state } + + /// Get weak reference to [`SessionState`] + pub fn state_weak_ref(&self) -> Weak> { + Arc::downgrade(&self.state) + } + + /// Register [`CatalogList`] in [`SessionState`] + pub fn register_catalog_list(&mut self, catalog_list: Arc) { + self.state.write().catalog_list = catalog_list; + } } impl FunctionRegistry for SessionContext { @@ -1788,6 +1798,11 @@ impl SessionState { pub fn task_ctx(&self) -> Arc { Arc::new(TaskContext::from(self)) } + + /// Return catalog list + pub fn catalog_list(&self) -> Arc { + self.catalog_list.clone() + } } struct SessionContextProvider<'a> {