Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: SQL Catalog - namespaces
Signed-off-by: callum-ryan <[email protected]>
  • Loading branch information
callum-ryan committed Aug 9, 2024
commit 82be6c6d2b67e7d3c37761f858361ff8b8f8b95d
199 changes: 185 additions & 14 deletions crates/catalog/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ use std::time::Duration;
use async_trait::async_trait;
use iceberg::io::FileIO;
use iceberg::table::Table;
use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent};
use iceberg::{
Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent,
};
use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyRow};
use sqlx::AnyPool;
use sqlx::{AnyPool, Row};
use typed_builder::TypedBuilder;

use crate::error::from_sqlx_error;
use crate::error::{from_sqlx_error, no_such_namespace_err};

static CATALOG_TABLE_NAME: &str = "iceberg_tables";
static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name";
Expand Down Expand Up @@ -61,7 +63,7 @@ pub struct SqlCatalogConfig {
#[derive(Debug)]
/// Sql catalog implementation.
pub struct SqlCatalog {
_name: String,
name: String,
connection: AnyPool,
_warehouse_location: String,
_fileio: FileIO,
Expand Down Expand Up @@ -132,7 +134,7 @@ impl SqlCatalog {
.map_err(from_sqlx_error)?;

Ok(SqlCatalog {
_name: config.name.to_owned(),
name: config.name.to_owned(),
connection: pool,
_warehouse_location: config.warehouse_location,
_fileio: config.file_io,
Expand Down Expand Up @@ -173,25 +175,194 @@ impl SqlCatalog {
impl Catalog for SqlCatalog {
async fn list_namespaces(
&self,
_parent: Option<&NamespaceIdent>,
parent: Option<&NamespaceIdent>,
) -> Result<Vec<NamespaceIdent>> {
todo!()
let table_namespaces_stmt = format!(
"SELECT {CATALOG_FIELD_TABLE_NAMESPACE}
FROM {CATALOG_TABLE_NAME}
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
);
let namespaces_stmt = format!(
"SELECT {NAMESPACE_FIELD_NAME}
FROM {NAMESPACE_TABLE_NAME}
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
);

match parent {
Some(parent) => match self.namespace_exists(parent).await? {
true => {
let parent_table_namespaces_stmt = format!(
"{table_namespaces_stmt} AND {CATALOG_FIELD_TABLE_NAMESPACE} LIKE CONCAT(?, '%')"
);
let parent_namespaces_stmt =
format!("{namespaces_stmt} AND {NAMESPACE_FIELD_NAME} LIKE CONCAT(?, '%')");

let namespace_rows = self
.execute_statement(
&format!(
"{parent_namespaces_stmt} UNION {parent_table_namespaces_stmt}"
),
vec![
Some(&self.name),
Some(&parent.join(".")),
Some(&self.name),
Some(&parent.join(".")),
],
)
.await?;

Ok(namespace_rows
.iter()
.filter_map(|r| {
let nsp = r.try_get::<String, _>(0).ok();
nsp.and_then(|n| NamespaceIdent::from_strs(n.split('.')).ok())
})
.collect())
}
false => no_such_namespace_err(parent),
},
None => {
let namespace_rows = self
.execute_statement(
&format!("{namespaces_stmt} UNION {table_namespaces_stmt}"),
vec![Some(&self.name), Some(&self.name)],
)
.await?;

Ok(namespace_rows
.iter()
.filter_map(|r| {
let nsp = r.try_get::<String, _>(0).ok();
nsp.and_then(|n| NamespaceIdent::from_strs(n.split('.')).ok())
})
.collect())
}
}
}

async fn create_namespace(
&self,
_namespace: &NamespaceIdent,
_properties: HashMap<String, String>,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<Namespace> {
todo!()
let exists = self.namespace_exists(namespace).await?;
if exists {
Err(Error::new(
iceberg::ErrorKind::Unexpected,
format!("Namespace {:?} already exists", namespace),
))
} else {
let namespace_str = namespace.join(".");
let insert = format!(
"INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
VALUES (?, ?, ?, ?)");
if !properties.is_empty() {
let mut query_args = vec![];
let mut properties_insert = insert.clone();
for (index, (key, value)) in properties.iter().enumerate() {
query_args.extend(
[
Some(&self.name),
Some(&namespace_str),
Some(key),
Some(value),
]
.iter(),
);
if index > 0 {
properties_insert = format!("{properties_insert}, (?, ?, ?, ?)");
}
}

self.execute_statement(&properties_insert, query_args)
.await?;

Ok(Namespace::with_properties(namespace.clone(), properties))
} else {
// set a default property of exists = true
self.execute_statement(&insert, vec![
Some(&self.name),
Some(&namespace_str),
Some(&"exists".to_string()),
Some(&"true".to_string()),
])
.await?;
Ok(Namespace::with_properties(namespace.clone(), properties))
}
}
}

async fn get_namespace(&self, _namespace: &NamespaceIdent) -> Result<Namespace> {
todo!()
async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
let exists = self.namespace_exists(namespace).await?;
if exists {
let namespace_props = self
.execute_statement(
&format!(
"SELECT
{NAMESPACE_FIELD_NAME},
{NAMESPACE_FIELD_PROPERTY_KEY},
{NAMESPACE_FIELD_PROPERTY_VALUE}
FROM {NAMESPACE_TABLE_NAME}
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
AND {NAMESPACE_FIELD_NAME} = ?"
),
vec![Some(&self.name), Some(&namespace.join("."))],
)
.await?;

let properties: HashMap<String, String> = namespace_props
.iter()
.filter_map(|r| {
let key = r.try_get(NAMESPACE_FIELD_PROPERTY_KEY).ok();
let value = r.try_get(NAMESPACE_FIELD_PROPERTY_VALUE).ok();
match (key, value) {
(Some(k), Some(v)) => Some((k, v)),
_ => None,
}
})
.collect();

Ok(Namespace::with_properties(namespace.clone(), properties))
} else {
no_such_namespace_err(namespace)
}
}

async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> Result<bool> {
todo!()
async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
let namespace_str = namespace.join(".");

let table_namespaces = self
.execute_statement(
&format!(
"SELECT 1 FROM {CATALOG_TABLE_NAME}
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
LIMIT 1"
),
vec![Some(&self.name), Some(&namespace_str)],
)
.await?;

if !table_namespaces.is_empty() {
Ok(true)
} else {
let namespaces = self
.execute_statement(
&format!(
"SELECT 1 FROM {NAMESPACE_TABLE_NAME}
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
AND {NAMESPACE_FIELD_NAME} = ?
LIMIT 1"
),
vec![Some(&self.name), Some(&namespace_str)],
)
.await?;
if !namespaces.is_empty() {
Ok(true)
} else {
Ok(false)
}
}
}

async fn update_namespace(
Expand Down
9 changes: 8 additions & 1 deletion crates/catalog/sql/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use iceberg::{Error, ErrorKind};
use iceberg::{Error, ErrorKind, NamespaceIdent, Result};

/// Format an sqlx error into iceberg error.
pub fn from_sqlx_error(error: sqlx::Error) -> Error {
Expand All @@ -25,3 +25,10 @@ pub fn from_sqlx_error(error: sqlx::Error) -> Error {
)
.with_source(error)
}

pub fn no_such_namespace_err<T>(namespace: &NamespaceIdent) -> Result<T> {
Err(Error::new(
ErrorKind::Unexpected,
format!("No such namespace: {:?}", namespace),
))
}