diff --git a/datafusion-cli/src/catalog.rs b/datafusion-cli/src/catalog.rs new file mode 100644 index 0000000000000..95e107653ddd9 --- /dev/null +++ b/datafusion-cli/src/catalog.rs @@ -0,0 +1,159 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::catalog::catalog::{CatalogList, CatalogProvider}; +use datafusion::catalog::schema::{SchemaProvider}; +use datafusion::datasource::TableProvider; +use datafusion::error::{DataFusionError, Result}; +use std::any::Any; +use std::sync::Arc; + +/// Wraps another catalog, automatically creating table providers +/// for local files if needed +pub struct DynamicFileCatalog { + inner: Arc, +} + +impl DynamicFileCatalog { + pub fn new(inner: Arc) -> Self { + Self {inner} + } +} + +impl CatalogList for DynamicFileCatalog { + fn as_any(&self) -> &dyn Any { + self + } + + fn register_catalog( + &self, + name: String, + catalog: Arc, + ) -> Option> { + self.inner.register_catalog(name, catalog) + } + + fn catalog_names(&self) -> Vec { + self.inner.catalog_names() + } + + fn catalog(&self, name: &str) -> Option> { + println!("Providing catalog for name {}", name); + self.inner.catalog(name) + .map(|catalog| Arc::new(DynamicFileCatalogProvider::new(catalog)) as _) + + // TODO: fallback to sorting out filenames + } +} + +/// Wraps another catalog provider +struct DynamicFileCatalogProvider { + inner: Arc, +} + +impl DynamicFileCatalogProvider { + pub fn new(inner: Arc) -> Self { + Self { inner } + } +} + + +impl CatalogProvider for DynamicFileCatalogProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + self.inner.schema_names() + } + + fn schema(&self, name: &str) -> Option> { + println!("Providing schema for {name}"); + self.inner.schema(name) + .map(|schema| Arc::new(DynamicFileSchemaProvider::new(schema)) as _) + // TODO fallback to sorting out other filenames (with periods) + } + + fn register_schema( + &self, + name: &str, + schema: Arc, + ) -> Result>> { + self.inner.register_schema(name, schema) + } +} + +/// Wraps another schema provider +struct DynamicFileSchemaProvider { + inner: Arc, +} + +impl DynamicFileSchemaProvider { + pub fn new(inner: Arc) -> Self { + Self { inner } + } +} + + +impl SchemaProvider for DynamicFileSchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + self.inner.table_names() + } + + fn table(&self, name: &str) -> Option> { + println!("Getting provider for {name}"); + + + self.inner.table(name) + .or_else(|| { + // if the inner schema provider didn't have a table by + // that name, try to treat it as a listing table + println!("Treating {name} as filename..."); + + //let config ListingTableConfig::new() + + // TODO make the correct table provider here + //let factory ListingTableFactory + + // todo wrap here + //todo!(); + None + }) + + + } + + fn register_table( + &self, + name: String, + table: Arc, + ) -> Result>> { + self.inner.register_table(name, table) + } + + fn deregister_table(&self, name: &str) -> Result>> { + self.inner.deregister_table(name) + } + + fn table_exist(&self, name: &str) -> bool { + self.inner.table_exist(name) + } +} diff --git a/datafusion-cli/src/lib.rs b/datafusion-cli/src/lib.rs index 44d8f06107f94..7eb3cb51c1f88 100644 --- a/datafusion-cli/src/lib.rs +++ b/datafusion-cli/src/lib.rs @@ -18,6 +18,7 @@ #![doc = include_str!("../README.md")] pub const DATAFUSION_CLI_VERSION: &str = env!("CARGO_PKG_VERSION"); +pub mod catalog; pub mod command; pub mod exec; pub mod functions; diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index c4e198ff009c8..2a2e209b3edf3 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -21,6 +21,7 @@ use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::SessionConfig; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::prelude::SessionContext; +use datafusion_cli::catalog::DynamicFileCatalog; use datafusion_cli::object_storage::DatafusionCliObjectStoreProvider; use datafusion_cli::{ exec, print_format::PrintFormat, print_options::PrintOptions, DATAFUSION_CLI_VERSION, @@ -30,6 +31,7 @@ use std::env; use std::path::Path; use std::sync::Arc; + #[global_allocator] static GLOBAL: MiMalloc = MiMalloc; @@ -105,6 +107,14 @@ pub async fn main() -> Result<()> { let runtime_env = create_runtime_env()?; let mut ctx = SessionContext::with_config_rt(session_config.clone(), Arc::new(runtime_env)); + + // install dynamic catalog provider that knows how to open files + { + let mut state = ctx.state.write(); + state.catalog_list = Arc::new(DynamicFileCatalog::new(state.catalog_list.clone())); + } + + ctx.refresh_catalogs().await?; let mut print_options = PrintOptions { @@ -142,12 +152,18 @@ pub async fn main() -> Result<()> { } } + + + + + fn create_runtime_env() -> Result { let object_store_provider = DatafusionCliObjectStoreProvider {}; let object_store_registry = ObjectStoreRegistry::new_with_provider(Some(Arc::new(object_store_provider))); let rn_config = RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry)); + RuntimeEnv::new(rn_config) }