Skip to content
Merged
Prev Previous commit
Next Next commit
fix: add default properties to all new namespaces
Signed-off-by: callum-ryan <[email protected]>
  • Loading branch information
callum-ryan committed Aug 21, 2024
commit b63ecd54c20013fa7c5cc18649f44553b6468f90
43 changes: 20 additions & 23 deletions crates/catalog/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,26 +287,31 @@ impl Catalog for SqlCatalog {
"INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
VALUES (?, ?, ?, ?)");
if !properties.is_empty() {
let mut query_args = Vec::with_capacity(properties.len() * 4);
let mut properties_insert = insert.clone();
for (index, (key, value)) in properties.iter().enumerate() {
let mut insert_properties = properties.clone();
insert_properties.insert("exists".to_string(), "true".to_string());

let mut query_args = Vec::with_capacity(insert_properties.len() * 4);
let mut insert_stmt = insert.clone();
for (index, (key, value)) in insert_properties.iter().enumerate() {
query_args.extend_from_slice(&[
Some(self.name.as_str()),
Some(namespace_str.as_str()),
Some(key.as_str()),
Some(value.as_str()),
]);
if index > 0 {
properties_insert = format!("{properties_insert}, (?, ?, ?, ?)");
insert_stmt.push_str(", (?, ?, ?, ?)");
}
}

self.execute(&properties_insert, query_args, None).await?;
self.execute(&insert_stmt, query_args, None).await?;

Ok(Namespace::with_properties(namespace.clone(), properties))
Ok(Namespace::with_properties(
namespace.clone(),
insert_properties,
))
} else {
// set a default property of exists = true
// up for debate if this is worthwhile
self.execute(
&insert,
vec![
Expand Down Expand Up @@ -468,21 +473,8 @@ impl Catalog for SqlCatalog {
}
}

async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
let exists = self.namespace_exists(namespace).await?;
if exists {
// TODO: check that the namespace is empty
self.execute(
&format!("DELETE FROM {NAMESPACE_TABLE_NAME} WHERE {NAMESPACE_FIELD_NAME} = ?"),
vec![Some(&namespace.join("."))],
None,
)
.await?;

Ok(())
} else {
no_such_namespace_err(namespace)
}
async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> {
todo!()
}

async fn list_tables(&self, _namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
Expand Down Expand Up @@ -729,7 +721,7 @@ mod tests {
let catalog = new_sql_catalog(warehouse_loc).await;
let namespace_ident = NamespaceIdent::new("abc".into());

let mut properties: HashMap<String, String> = HashMap::new();
let mut properties = default_properties();
properties.insert("k".into(), "v".into());

assert_eq!(
Expand Down Expand Up @@ -841,6 +833,7 @@ mod tests {
}

#[tokio::test]
#[ignore = "drop_namespace not implemented"]
async fn test_drop_namespace() {
let warehouse_loc = temp_path();
let catalog = new_sql_catalog(warehouse_loc).await;
Expand All @@ -853,6 +846,7 @@ mod tests {
}

#[tokio::test]
#[ignore = "drop_namespace not implemented"]
async fn test_drop_nested_namespace() {
let warehouse_loc = temp_path();
let catalog = new_sql_catalog(warehouse_loc).await;
Expand All @@ -871,6 +865,7 @@ mod tests {
}

#[tokio::test]
#[ignore = "drop_namespace not implemented"]
async fn test_drop_deeply_nested_namespace() {
let warehouse_loc = temp_path();
let catalog = new_sql_catalog(warehouse_loc).await;
Expand Down Expand Up @@ -903,6 +898,7 @@ mod tests {
}

#[tokio::test]
#[ignore = "drop_namespace not implemented"]
async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
let warehouse_loc = temp_path();
let catalog = new_sql_catalog(warehouse_loc).await;
Expand All @@ -922,6 +918,7 @@ mod tests {
}

#[tokio::test]
#[ignore = "drop_namespace not implemented"]
async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
let warehouse_loc = temp_path();
let catalog = new_sql_catalog(warehouse_loc).await;
Expand Down