Skip to content
Merged
Next Next commit
Default to RemoteStore for fsspec URIs
  • Loading branch information
TomAugspurger committed Sep 17, 2024
commit f81364b9cd719c39233e94aeafbb9ecfd77711f5
61 changes: 49 additions & 12 deletions src/zarr/api/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ async def open(
zarr_version: ZarrFormat | None = None, # deprecated
zarr_format: ZarrFormat | None = None,
path: str | None = None,
storage_options: dict[str, Any] | None = None,
**kwargs: Any, # TODO: type kwargs as valid args to open_array
) -> AsyncArray | AsyncGroup:
"""Convenience function to open a group or array using file-mode-like semantics.
Expand All @@ -211,6 +212,9 @@ async def open(
The zarr format to use when saving.
path : str or None, optional
The path within the store to open.
storage_options : dict
If using an fsspec URL to create the store, these will be passed to
the backend implementation. Ignored otherwise.
**kwargs
Additional parameters are passed through to :func:`zarr.creation.open_array` or
:func:`zarr.hierarchy.open_group`.
Expand All @@ -221,7 +225,7 @@ async def open(
Return type depends on what exists in the given store.
"""
zarr_format = _handle_zarr_version_or_format(zarr_version=zarr_version, zarr_format=zarr_format)
store_path = await make_store_path(store, mode=mode)
store_path = await make_store_path(store, mode=mode, storage_options=storage_options)

if path is not None:
store_path = store_path / path
Expand Down Expand Up @@ -276,6 +280,7 @@ async def save_array(
zarr_version: ZarrFormat | None = None, # deprecated
zarr_format: ZarrFormat | None = None,
path: str | None = None,
storage_options: dict[str, Any] | None = None,
**kwargs: Any, # TODO: type kwargs as valid args to create
) -> None:
"""Convenience function to save a NumPy array to the local file system, following a
Expand All @@ -291,6 +296,9 @@ async def save_array(
The zarr format to use when saving.
path : str or None, optional
The path within the store where the array will be saved.
storage_options : dict
If using an fsspec URL to create the store, these will be passed to
the backend implementation. Ignored otherwise.
kwargs
Passed through to :func:`create`, e.g., compressor.
"""
Expand All @@ -299,7 +307,7 @@ async def save_array(
or _default_zarr_version()
)

store_path = await make_store_path(store, mode="w")
store_path = await make_store_path(store, mode="w", storage_options=storage_options)
if path is not None:
store_path = store_path / path
new = await AsyncArray.create(
Expand All @@ -319,6 +327,7 @@ async def save_group(
zarr_version: ZarrFormat | None = None, # deprecated
zarr_format: ZarrFormat | None = None,
path: str | None = None,
storage_options: dict[str, Any] | None = None,
**kwargs: NDArrayLike,
) -> None:
"""Convenience function to save several NumPy arrays to the local file system, following a
Expand All @@ -334,22 +343,40 @@ async def save_group(
The zarr format to use when saving.
path : str or None, optional
Path within the store where the group will be saved.
storage_options : dict
If using an fsspec URL to create the store, these will be passed to
the backend implementation. Ignored otherwise.
kwargs
NumPy arrays with data to save.
"""
zarr_format = (
_handle_zarr_version_or_format(zarr_version=zarr_version, zarr_format=zarr_format)
_handle_zarr_version_or_format(
zarr_version=zarr_version,
zarr_format=zarr_format,
)
or _default_zarr_version()
)

if len(args) == 0 and len(kwargs) == 0:
raise ValueError("at least one array must be provided")
aws = []
for i, arr in enumerate(args):
aws.append(save_array(store, arr, zarr_format=zarr_format, path=f"{path}/arr_{i}"))
aws.append(
save_array(
store,
arr,
zarr_format=zarr_format,
path=f"{path}/arr_{i}",
storage_options=storage_options,
)
)
for k, arr in kwargs.items():
_path = f"{path}/{k}" if path is not None else k
aws.append(save_array(store, arr, zarr_format=zarr_format, path=_path))
aws.append(
save_array(
store, arr, zarr_format=zarr_format, path=_path, storage_options=storage_options
)
)
await asyncio.gather(*aws)


Expand Down Expand Up @@ -418,6 +445,7 @@ async def group(
zarr_format: ZarrFormat | None = None,
meta_array: Any | None = None, # not used
attributes: dict[str, JSON] | None = None,
storage_options: dict[str, Any] | None = None,
) -> AsyncGroup:
"""Create a group.

Expand All @@ -444,6 +472,9 @@ async def group(
to users. Use `numpy.empty(())` by default.
zarr_format : {2, 3, None}, optional
The zarr format to use when saving.
storage_options : dict
If using an fsspec URL to create the store, these will be passed to
the backend implementation. Ignored otherwise.

Returns
-------
Expand All @@ -453,7 +484,7 @@ async def group(

zarr_format = _handle_zarr_version_or_format(zarr_version=zarr_version, zarr_format=zarr_format)

store_path = await make_store_path(store)
store_path = await make_store_path(store, storage_options=storage_options)
if path is not None:
store_path = store_path / path

Expand Down Expand Up @@ -488,7 +519,7 @@ async def open_group(
synchronizer: Any = None, # not used
path: str | None = None,
chunk_store: StoreLike | None = None, # not used
storage_options: dict[str, Any] | None = None, # not used
storage_options: dict[str, Any] | None = None,
zarr_version: ZarrFormat | None = None, # deprecated
zarr_format: ZarrFormat | None = None,
meta_array: Any | None = None, # not used
Expand Down Expand Up @@ -548,10 +579,8 @@ async def open_group(
warnings.warn("meta_array is not yet implemented", RuntimeWarning, stacklevel=2)
if chunk_store is not None:
warnings.warn("chunk_store is not yet implemented", RuntimeWarning, stacklevel=2)
if storage_options is not None:
warnings.warn("storage_options is not yet implemented", RuntimeWarning, stacklevel=2)

store_path = await make_store_path(store, mode=mode)
store_path = await make_store_path(store, mode=mode, storage_options=storage_options)
if path is not None:
store_path = store_path / path

Expand Down Expand Up @@ -603,6 +632,7 @@ async def create(
) = None,
codecs: Iterable[Codec | dict[str, JSON]] | None = None,
dimension_names: Iterable[str] | None = None,
storage_options: dict[str, Any] | None = None,
**kwargs: Any,
) -> AsyncArray:
"""Create an array.
Expand Down Expand Up @@ -674,6 +704,9 @@ async def create(
to users. Use `numpy.empty(())` by default.

.. versionadded:: 2.13
storage_options : dict
If using an fsspec URL to create the store, these will be passed to
the backend implementation. Ignored otherwise.

Returns
-------
Expand Down Expand Up @@ -725,7 +758,7 @@ async def create(
warnings.warn("meta_array is not yet implemented", RuntimeWarning, stacklevel=2)

mode = kwargs.pop("mode", cast(AccessModeLiteral, "r" if read_only else "w"))
store_path = await make_store_path(store, mode=mode)
store_path = await make_store_path(store, mode=mode, storage_options=storage_options)
if path is not None:
store_path = store_path / path

Expand Down Expand Up @@ -875,6 +908,7 @@ async def open_array(
zarr_version: ZarrFormat | None = None, # deprecated
zarr_format: ZarrFormat | None = None,
path: PathLike | None = None,
storage_options: dict[str, Any] | None = None,
**kwargs: Any, # TODO: type kwargs as valid args to save
) -> AsyncArray:
"""Open an array using file-mode-like semantics.
Expand All @@ -887,6 +921,9 @@ async def open_array(
The zarr format to use when saving.
path : string, optional
Path in store to array.
storage_options : dict
If using an fsspec URL to create the store, these will be passed to
the backend implementation. Ignored otherwise.
**kwargs
Any keyword arguments to pass to the array constructor.

Expand All @@ -896,7 +933,7 @@ async def open_array(
The opened array.
"""

store_path = await make_store_path(store)
store_path = await make_store_path(store, storage_options=storage_options)
if path is not None:
store_path = store_path / path

Expand Down
2 changes: 2 additions & 0 deletions src/zarr/api/synchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def save_group(
zarr_version: ZarrFormat | None = None, # deprecated
zarr_format: ZarrFormat | None = None,
path: str | None = None,
storage_options: dict[str, Any] | None = None,
**kwargs: NDArrayLike,
) -> None:
return sync(
Expand All @@ -143,6 +144,7 @@ def save_group(
zarr_version=zarr_version,
zarr_format=zarr_format,
path=path,
storage_options=storage_options,
**kwargs,
)
)
Expand Down
41 changes: 33 additions & 8 deletions src/zarr/store/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal

import fsspec
import fsspec.implementations

from zarr.abc.store import AccessMode, Store
from zarr.core.buffer import Buffer, default_buffer_prototype
from zarr.core.common import ZARR_JSON, ZARRAY_JSON, ZGROUP_JSON, ZarrFormat
from zarr.errors import ContainsArrayAndGroupError, ContainsArrayError, ContainsGroupError
from zarr.store.local import LocalStore
from zarr.store.memory import MemoryStore

# from zarr.store.remote import RemoteStore

if TYPE_CHECKING:
from zarr.core.buffer import BufferPrototype
from zarr.core.common import AccessModeLiteral
Expand Down Expand Up @@ -75,30 +80,50 @@ def __eq__(self, other: Any) -> bool:


async def make_store_path(
store_like: StoreLike | None, *, mode: AccessModeLiteral | None = None
store_like: StoreLike | None,
*,
path: str | None = None,
mode: AccessModeLiteral | None = None,
storage_options: dict[str, Any] | None = None,
) -> StorePath:
from zarr.store.remote import RemoteStore # circular import

if isinstance(store_like, StorePath):
if mode is not None:
assert AccessMode.from_literal(mode) == store_like.store.mode
return store_like
result = store_like
elif isinstance(store_like, Store):
if mode is not None:
assert AccessMode.from_literal(mode) == store_like.mode
await store_like._ensure_open()
return StorePath(store_like)
result = StorePath(store_like)
elif store_like is None:
if mode is None:
mode = "w" # exception to the default mode = 'r'
return StorePath(await MemoryStore.open(mode=mode))
result = StorePath(await MemoryStore.open(mode=mode))
elif isinstance(store_like, Path):
return StorePath(await LocalStore.open(root=store_like, mode=mode or "r"))
result = StorePath(await LocalStore.open(root=store_like, mode=mode or "r"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can pass **storage_options to LocalStore as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be missing something, but I don't think that'll work. LocalStore.open will call LocalStore.__init__, which just takes root and mode, which are passed as regular args here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. I was thinking auto_mkdir would be passed through but if that's not the case, let's not get distracted here.

elif isinstance(store_like, str):
return StorePath(await LocalStore.open(root=Path(store_like), mode=mode or "r"))
try:
fs, path = fsspec.url_to_fs(store_like, **storage_options)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: this constructs the actual filesystem class instance, but we have to go back to a URL (and reconstruct the filesystem instance) for the RemoteStore interface. It'd be nice to avoid that.

except Exception:
# not sure what to do here, but I don't want this to fail...
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is bad, but in my experience fsspec.url_to_fs can run pretty much arbitrary code, depending on what packages you have installed and what URI you pass. I really don't want this to fail and cause issues for people who just want a local path. maybe we'd always get an fsspec LocalFileSystem for a plain, non-fsspec URI string, but I'm not sure.

pass
else:
if "file" not in fs.protocol:
storage_options = storage_options or {}
return StorePath(RemoteStore(url=store_like, mode=mode or "r", **storage_options))
result = StorePath(await LocalStore.open(root=Path(store_like), mode=mode or "r"))
elif isinstance(store_like, dict):
# We deliberate only consider dict[str, Buffer] here, and not arbitrary mutable mappings.
# By only allowing dictionaries, which are in-memory, we know that MemoryStore appropriate.
return StorePath(await MemoryStore.open(store_dict=store_like, mode=mode))
raise TypeError
result = StorePath(await MemoryStore.open(store_dict=store_like, mode=mode))
else:
raise TypeError

if path is not None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I forgot to revert this. It's not used yet.

We call make_store_path multiple times in a single call by the user (e.g. save_array calls make_store_path and passes the result to AsyncArray.create which also calls make_store_path). I was thinking about consolidating all the store / path stuff in a single spot, but it ended up being a bit much and I wasn't sure what the tolerance was for expanding the API of methods like AsyncArray.create even more. Do we want users to do

AsyncArray.create(store=StorePath(RemoteStore(..., **storage_options), path))

or

AsyncArray.create(store="s3://bucket/path.zarr", storage_options=...)

result = result / path
return result


async def ensure_no_existing_node(store_path: StorePath, zarr_format: ZarrFormat) -> None:
Expand Down
9 changes: 9 additions & 0 deletions tests/v3/test_store/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from zarr.store.common import make_store_path
from zarr.store.local import LocalStore
from zarr.store.memory import MemoryStore
from zarr.store.remote import RemoteStore


async def test_make_store_path(tmpdir: str) -> None:
Expand Down Expand Up @@ -34,3 +35,11 @@ async def test_make_store_path(tmpdir: str) -> None:

with pytest.raises(TypeError):
await make_store_path(1) # type: ignore[arg-type]


async def test_make_store_path_fsspec(monkeypatch) -> None:
import fsspec.implementations.memory

monkeypatch.setattr(fsspec.implementations.memory.MemoryFileSystem, "async_impl", True)
store_path = await make_store_path("memory://")
assert isinstance(store_path.store, RemoteStore)
41 changes: 41 additions & 0 deletions tests/v3/test_store/test_remote.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import os
from collections.abc import Generator

Expand All @@ -6,6 +7,7 @@
import pytest
from upath import UPath

import zarr.api.asynchronous
from zarr.core.buffer import Buffer, cpu, default_buffer_prototype
from zarr.core.sync import sync
from zarr.store import RemoteStore
Expand Down Expand Up @@ -158,3 +160,42 @@ def test_store_supports_partial_writes(self, store: RemoteStore) -> None:

def test_store_supports_listing(self, store: RemoteStore) -> None:
assert True

async def test_remote_store_from_uri(
self, store: RemoteStore, store_kwargs: dict[str, str | bool]
):
storage_options = {
"endpoint_url": endpoint_url,
"anon": False,
}

meta = {"attributes": {"key": "value"}, "zarr_format": 3, "node_type": "group"}

await store.set(
"zarr.json",
self.buffer_cls.from_bytes(json.dumps(meta).encode()),
)
group = await zarr.api.asynchronous.open_group(
store=store._url, storage_options=storage_options
)
assert dict(group.attrs) == {"key": "value"}

meta["attributes"]["key"] = "value-2"
await store.set(
"directory-2/zarr.json",
self.buffer_cls.from_bytes(json.dumps(meta).encode()),
)
group = await zarr.api.asynchronous.open_group(
store="/".join([store._url.rstrip("/"), "directory-2"]), storage_options=storage_options
)
assert dict(group.attrs) == {"key": "value-2"}

meta["attributes"]["key"] = "value-3"
await store.set(
"directory-3/zarr.json",
self.buffer_cls.from_bytes(json.dumps(meta).encode()),
)
group = await zarr.api.asynchronous.open_group(
store=store._url, path="directory-3", storage_options=storage_options
)
assert dict(group.attrs) == {"key": "value-3"}