Skip to content

Commit befb42e

Browse files
committed
support select .. FROM 'parquet.file' in datafusion-cli
1 parent 169b522 commit befb42e

File tree

6 files changed

+192
-1
lines changed

6 files changed

+192
-1
lines changed

datafusion-cli/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-cli/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@ readme = "README.md"
3030

3131
[dependencies]
3232
arrow = "29.0.0"
33+
async-trait = "0.1.41"
3334
clap = { version = "3", features = ["derive", "cargo"] }
3435
datafusion = { path = "../datafusion/core", version = "15.0.0" }
3536
dirs = "4.0.0"
3637
env_logger = "0.9"
3738
mimalloc = { version = "0.1", default-features = false }
3839
object_store = { version = "0.5.0", features = ["aws", "gcp"] }
40+
parking_lot = { version = "0.12" }
3941
rustyline = "10.0"
4042
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
4143
url = "2.2"

datafusion-cli/src/catalog.rs

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use async_trait::async_trait;
19+
use datafusion::catalog::catalog::{CatalogList, CatalogProvider};
20+
use datafusion::catalog::schema::SchemaProvider;
21+
use datafusion::datasource::listing::{
22+
ListingTable, ListingTableConfig, ListingTableUrl,
23+
};
24+
use datafusion::datasource::TableProvider;
25+
use datafusion::error::Result;
26+
use datafusion::execution::context::SessionState;
27+
use parking_lot::RwLock;
28+
use std::any::Any;
29+
use std::sync::{Arc, Weak};
30+
31+
/// Wraps another catalog, automatically creating table providers
32+
/// for local files if needed
33+
pub struct DynamicFileCatalog {
34+
inner: Arc<dyn CatalogList>,
35+
state: Weak<RwLock<SessionState>>,
36+
}
37+
38+
impl DynamicFileCatalog {
39+
pub fn new(inner: Arc<dyn CatalogList>, state: Weak<RwLock<SessionState>>) -> Self {
40+
Self { inner, state }
41+
}
42+
}
43+
44+
impl CatalogList for DynamicFileCatalog {
45+
fn as_any(&self) -> &dyn Any {
46+
self
47+
}
48+
49+
fn register_catalog(
50+
&self,
51+
name: String,
52+
catalog: Arc<dyn CatalogProvider>,
53+
) -> Option<Arc<dyn CatalogProvider>> {
54+
self.inner.register_catalog(name, catalog)
55+
}
56+
57+
fn catalog_names(&self) -> Vec<String> {
58+
self.inner.catalog_names()
59+
}
60+
61+
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
62+
let state = self.state.clone();
63+
self.inner
64+
.catalog(name)
65+
.map(|catalog| Arc::new(DynamicFileCatalogProvider::new(catalog, state)) as _)
66+
}
67+
}
68+
69+
/// Wraps another catalog provider
70+
struct DynamicFileCatalogProvider {
71+
inner: Arc<dyn CatalogProvider>,
72+
state: Weak<RwLock<SessionState>>,
73+
}
74+
75+
impl DynamicFileCatalogProvider {
76+
pub fn new(
77+
inner: Arc<dyn CatalogProvider>,
78+
state: Weak<RwLock<SessionState>>,
79+
) -> Self {
80+
Self { inner, state }
81+
}
82+
}
83+
84+
impl CatalogProvider for DynamicFileCatalogProvider {
85+
fn as_any(&self) -> &dyn Any {
86+
self
87+
}
88+
89+
fn schema_names(&self) -> Vec<String> {
90+
self.inner.schema_names()
91+
}
92+
93+
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
94+
let state = self.state.clone();
95+
self.inner
96+
.schema(name)
97+
.map(|schema| Arc::new(DynamicFileSchemaProvider::new(schema, state)) as _)
98+
}
99+
100+
fn register_schema(
101+
&self,
102+
name: &str,
103+
schema: Arc<dyn SchemaProvider>,
104+
) -> Result<Option<Arc<dyn SchemaProvider>>> {
105+
self.inner.register_schema(name, schema)
106+
}
107+
}
108+
109+
/// Wraps another schema provider
110+
struct DynamicFileSchemaProvider {
111+
inner: Arc<dyn SchemaProvider>,
112+
state: Weak<RwLock<SessionState>>,
113+
}
114+
115+
impl DynamicFileSchemaProvider {
116+
pub fn new(
117+
inner: Arc<dyn SchemaProvider>,
118+
state: Weak<RwLock<SessionState>>,
119+
) -> Self {
120+
Self { inner, state }
121+
}
122+
}
123+
124+
#[async_trait]
125+
impl SchemaProvider for DynamicFileSchemaProvider {
126+
fn as_any(&self) -> &dyn Any {
127+
self
128+
}
129+
130+
fn table_names(&self) -> Vec<String> {
131+
self.inner.table_names()
132+
}
133+
134+
fn register_table(
135+
&self,
136+
name: String,
137+
table: Arc<dyn TableProvider>,
138+
) -> Result<Option<Arc<dyn TableProvider>>> {
139+
self.inner.register_table(name, table)
140+
}
141+
142+
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
143+
let inner_table = self.inner.table(name).await;
144+
if inner_table.is_some() {
145+
return inner_table;
146+
}
147+
148+
// if the inner schema provider didn't have a table by
149+
// that name, try to treat it as a listing table
150+
let state = self.state.upgrade()?.read().clone();
151+
let config = ListingTableConfig::new(ListingTableUrl::parse(name).ok()?)
152+
.infer(&state)
153+
.await
154+
.ok()?;
155+
Some(Arc::new(ListingTable::try_new(config).ok()?))
156+
}
157+
158+
fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
159+
self.inner.deregister_table(name)
160+
}
161+
162+
fn table_exist(&self, name: &str) -> bool {
163+
self.inner.table_exist(name)
164+
}
165+
}

datafusion-cli/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#![doc = include_str!("../README.md")]
1919
pub const DATAFUSION_CLI_VERSION: &str = env!("CARGO_PKG_VERSION");
2020

21+
pub mod catalog;
2122
pub mod command;
2223
pub mod exec;
2324
pub mod functions;

datafusion-cli/src/main.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use datafusion::error::{DataFusionError, Result};
2121
use datafusion::execution::context::SessionConfig;
2222
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
2323
use datafusion::prelude::SessionContext;
24+
use datafusion_cli::catalog::DynamicFileCatalog;
2425
use datafusion_cli::object_storage::DatafusionCliObjectStoreProvider;
2526
use datafusion_cli::{
2627
exec, print_format::PrintFormat, print_options::PrintOptions, DATAFUSION_CLI_VERSION,
@@ -106,6 +107,11 @@ pub async fn main() -> Result<()> {
106107
let mut ctx =
107108
SessionContext::with_config_rt(session_config.clone(), Arc::new(runtime_env));
108109
ctx.refresh_catalogs().await?;
110+
// install dynamic catalog provider that knows how to open files
111+
ctx.register_catalog_list(Arc::new(DynamicFileCatalog::new(
112+
ctx.state().catalog_list(),
113+
ctx.state_weak_ref(),
114+
)));
109115

110116
let mut print_options = PrintOptions {
111117
format: args.format,

datafusion/core/src/execution/context.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use crate::{
3030
pub use datafusion_physical_expr::execution_props::ExecutionProps;
3131
use datafusion_physical_expr::var_provider::is_system_variables;
3232
use parking_lot::RwLock;
33-
use std::ops::ControlFlow;
3433
use std::sync::Arc;
3534
use std::{
3635
any::{Any, TypeId},
@@ -41,6 +40,7 @@ use std::{
4140
collections::{HashMap, HashSet},
4241
fmt::Debug,
4342
};
43+
use std::{ops::ControlFlow, sync::Weak};
4444

4545
use arrow::datatypes::{DataType, SchemaRef};
4646
use arrow::record_batch::RecordBatch;
@@ -1009,6 +1009,16 @@ impl SessionContext {
10091009
state.execution_props.start_execution();
10101010
state
10111011
}
1012+
1013+
/// Get weak reference to [`SessionState`]
1014+
pub fn state_weak_ref(&self) -> Weak<RwLock<SessionState>> {
1015+
Arc::downgrade(&self.state)
1016+
}
1017+
1018+
/// Register [`CatalogList`] in [`SessionState`]
1019+
pub fn register_catalog_list(&mut self, catalog_list: Arc<dyn CatalogList>) {
1020+
self.state.write().catalog_list = catalog_list;
1021+
}
10121022
}
10131023

10141024
impl FunctionRegistry for SessionContext {
@@ -1788,6 +1798,11 @@ impl SessionState {
17881798
pub fn task_ctx(&self) -> Arc<TaskContext> {
17891799
Arc::new(TaskContext::from(self))
17901800
}
1801+
1802+
/// Return catalog list
1803+
pub fn catalog_list(&self) -> Arc<dyn CatalogList> {
1804+
self.catalog_list.clone()
1805+
}
17911806
}
17921807

17931808
struct SessionContextProvider<'a> {

0 commit comments

Comments
 (0)