diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 3b77fd47f0..858f86f8fc 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -20,6 +20,7 @@ Any, Union, ) +from urllib.parse import quote, unquote from pydantic import Field, field_validator from requests import HTTPError, Session @@ -131,7 +132,8 @@ class IdentifierKind(Enum): AUTH = "auth" CUSTOM = "custom" -NAMESPACE_SEPARATOR = b"\x1f".decode(UTF8) +NAMESPACE_SEPARATOR_PROPERTY = "namespace-separator" +DEFAULT_NAMESPACE_SEPARATOR = b"\x1f".decode(UTF8) def _retry_hook(retry_state: RetryCallState) -> None: @@ -214,6 +216,7 @@ class ListViewsResponse(IcebergBaseModel): class RestCatalog(Catalog): uri: str _session: Session + _namespace_separator: str def __init__(self, name: str, **properties: str): """Rest Catalog. @@ -228,6 +231,10 @@ def __init__(self, name: str, **properties: str): self.uri = properties[URI] self._fetch_config() self._session = self._create_session() + separator_from_properties = self.properties.get(NAMESPACE_SEPARATOR_PROPERTY, DEFAULT_NAMESPACE_SEPARATOR) + if not separator_from_properties: + raise ValueError("Namespace separator cannot be an empty string") + self._namespace_separator = unquote(separator_from_properties) def _create_session(self) -> Session: """Create a request session with provided catalog configuration.""" @@ -351,6 +358,16 @@ def _extract_optional_oauth_params(self) -> dict[str, str]: return optional_oauth_param + def _encode_namespace_path(self, namespace: Identifier) -> str: + """ + Encode a namespace for use as a path parameter in a URL. + + Each part of the namespace is URL-encoded using `urllib.parse.quote` + (ensuring characters like '/' are encoded) and then joined by the + configured namespace separator. + """ + return self._namespace_separator.join(quote(part, safe="") for part in namespace) + def _fetch_config(self) -> None: params = {} if warehouse_location := self.properties.get(WAREHOUSE_LOCATION): @@ -382,10 +399,16 @@ def _split_identifier_for_path( self, identifier: str | Identifier | TableIdentifier, kind: IdentifierKind = IdentifierKind.TABLE ) -> Properties: if isinstance(identifier, TableIdentifier): - return {"namespace": NAMESPACE_SEPARATOR.join(identifier.namespace.root), kind.value: identifier.name} + return { + "namespace": self._encode_namespace_path(tuple(identifier.namespace.root)), + kind.value: quote(identifier.name, safe=""), + } identifier_tuple = self._identifier_to_validated_tuple(identifier) - return {"namespace": NAMESPACE_SEPARATOR.join(identifier_tuple[:-1]), kind.value: identifier_tuple[-1]} + return { + "namespace": self._encode_namespace_path(identifier_tuple[:-1]), + kind.value: quote(identifier_tuple[-1], safe=""), + } def _split_identifier_for_json(self, identifier: str | Identifier) -> dict[str, Identifier | str]: identifier_tuple = self._identifier_to_validated_tuple(identifier) @@ -600,7 +623,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str) - @retry(**_RETRY_ARGS) def list_tables(self, namespace: str | Identifier) -> list[Identifier]: namespace_tuple = self._check_valid_namespace_identifier(namespace) - namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple) + namespace_concat = self._encode_namespace_path(namespace_tuple) response = self._session.get(self.url(Endpoints.list_tables, namespace=namespace_concat)) try: response.raise_for_status() @@ -681,7 +704,7 @@ def _remove_catalog_name_from_table_request_identifier(self, table_request: Comm @retry(**_RETRY_ARGS) def list_views(self, namespace: str | Identifier) -> list[Identifier]: namespace_tuple = self._check_valid_namespace_identifier(namespace) - namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple) + namespace_concat = self._encode_namespace_path(namespace_tuple) response = self._session.get(self.url(Endpoints.list_views, namespace=namespace_concat)) try: response.raise_for_status() @@ -748,7 +771,7 @@ def create_namespace(self, namespace: str | Identifier, properties: Properties = @retry(**_RETRY_ARGS) def drop_namespace(self, namespace: str | Identifier) -> None: namespace_tuple = self._check_valid_namespace_identifier(namespace) - namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) + namespace = self._encode_namespace_path(namespace_tuple) response = self._session.delete(self.url(Endpoints.drop_namespace, namespace=namespace)) try: response.raise_for_status() @@ -760,7 +783,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: namespace_tuple = self.identifier_to_tuple(namespace) response = self._session.get( self.url( - f"{Endpoints.list_namespaces}?parent={NAMESPACE_SEPARATOR.join(namespace_tuple)}" + f"{Endpoints.list_namespaces}?parent={self._encode_namespace_path(namespace_tuple)}" if namespace_tuple else Endpoints.list_namespaces ), @@ -775,7 +798,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: @retry(**_RETRY_ARGS) def load_namespace_properties(self, namespace: str | Identifier) -> Properties: namespace_tuple = self._check_valid_namespace_identifier(namespace) - namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) + namespace = self._encode_namespace_path(namespace_tuple) response = self._session.get(self.url(Endpoints.load_namespace_metadata, namespace=namespace)) try: response.raise_for_status() @@ -789,7 +812,7 @@ def update_namespace_properties( self, namespace: str | Identifier, removals: set[str] | None = None, updates: Properties = EMPTY_DICT ) -> PropertiesUpdateSummary: namespace_tuple = self._check_valid_namespace_identifier(namespace) - namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) + namespace = self._encode_namespace_path(namespace_tuple) payload = {"removals": list(removals or []), "updates": updates} response = self._session.post(self.url(Endpoints.update_namespace_properties, namespace=namespace), json=payload) try: @@ -806,7 +829,7 @@ def update_namespace_properties( @retry(**_RETRY_ARGS) def namespace_exists(self, namespace: str | Identifier) -> bool: namespace_tuple = self._check_valid_namespace_identifier(namespace) - namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) + namespace = self._encode_namespace_path(namespace_tuple) response = self._session.head(self.url(Endpoints.namespace_exists, namespace=namespace)) if response.status_code == 404: diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index b8bee00225..21796b7362 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -1900,6 +1900,31 @@ def test_rest_catalog_with_google_credentials_path( assert actual_headers["Authorization"] == expected_auth_header +def test_custom_namespace_separator(rest_mock: Mocker) -> None: + custom_separator = "-" + namespace_part1 = "some" + namespace_part2 = "namespace" + # The expected URL path segment should use the literal custom_separator + expected_url_path_segment = f"{namespace_part1}{custom_separator}{namespace_part2}" + + rest_mock.get( + f"{TEST_URI}v1/config", + json={"defaults": {}, "overrides": {}}, + status_code=200, + ) + rest_mock.get( + f"{TEST_URI}v1/namespaces/{expected_url_path_segment}", + json={"namespace": [namespace_part1, namespace_part2], "properties": {"prop": "yes"}}, + status_code=200, + request_headers=TEST_HEADERS, + ) + + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{"namespace-separator": custom_separator}) + catalog.load_namespace_properties((namespace_part1, namespace_part2)) + + assert rest_mock.last_request.url == f"{TEST_URI}v1/namespaces/{expected_url_path_segment}" + + @pytest.mark.filterwarnings( "ignore:Deprecated in 0.8.0, will be removed in 1.0.0. Iceberg REST client is missing the OAuth2 server URI:DeprecationWarning" )