Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
refactor and test asynclocalfs
  • Loading branch information
skshetry committed Sep 27, 2022
commit 661fce66fa943eb3c9003100c31c90c0fe4fba78
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ tests =
pytest-sugar==0.9.5
pytest-cov==3.0.0
pytest-mock==3.8.2
pytest-asyncio==0.19.0
pylint==2.15.0
mypy==0.971
%(all)s
Expand Down
125 changes: 47 additions & 78 deletions src/morefs/asyn_local.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,20 @@
import asyncio
import datetime
import errno
import functools
import os
import posixpath
import shutil

import aiofile
import aiofiles.os
from aiofiles.os import wrap # type: ignore[attr-defined]
from fsspec import AbstractFileSystem
from fsspec.asyn import AbstractBufferedFile, AsyncFileSystem
from fsspec.asyn import AbstractAsyncStreamedFile, AsyncFileSystem
from fsspec.implementations.local import LocalFileSystem

aiofiles.os.chmod = wrap(os.chmod) # type: ignore[attr-defined]
aiofiles.os.utime = wrap(os.utime) # type: ignore[attr-defined]
aiofiles.os.path.islink = wrap(os.path.islink) # type: ignore[attr-defined]
aiofiles.os.path.lexists = wrap(os.path.lexists) # type: ignore[attr-defined]
async_rmtree = wrap(shutil.rmtree) # type: ignore[attr-defined]
async_move = wrap(shutil.move) # type: ignore[attr-defined]
async_copyfile = wrap(shutil.copyfile) # type: ignore[attr-defined]


def _copy_to_fobj(fs, path1, fdst):
with fs.open(path1, "rb") as fsrc:
shutil.copyfileobj(fsrc, fdst)


async_copy_to_fobj = wrap(_copy_to_fobj)
async_copy_to_fobj = wrap(LocalFileSystem.get_file)


async def copy_asyncfileobj(fsrc, fdst, length=shutil.COPY_BUFSIZE):
Expand All @@ -36,40 +24,28 @@ async def copy_asyncfileobj(fsrc, fdst, length=shutil.COPY_BUFSIZE):
await fdst_write(buf)


# pylint: disable=arguments-renamed


def wrapped(func):
@functools.wraps(func)
def inner(self, *args, **kwargs):
return func(self, *args, **kwargs)
# pylint: disable=abstract-method

return inner


class AsyncLocalFileSystem(AsyncFileSystem): # pylint: disable=abstract-method
find = wrapped(AbstractFileSystem.find)
walk = wrapped(AbstractFileSystem.walk)
exists = wrapped(AbstractFileSystem.exists)
isdir = wrapped(AbstractFileSystem.isdir)
isfile = wrapped(AbstractFileSystem.isfile)
lexists = staticmethod(LocalFileSystem.lexists)

ls = wrapped(LocalFileSystem.ls)
info = wrapped(LocalFileSystem.info)
class AsyncLocalFileSystem(AsyncFileSystem, LocalFileSystem):
_info = wrap(LocalFileSystem.info)
_lexists = wrap(LocalFileSystem.lexists)
_created = wrap(LocalFileSystem.created)
_modified = wrap(LocalFileSystem.modified)
_chmod = wrap(LocalFileSystem.chmod)
_mv_file = wrap(LocalFileSystem.mv_file)
_makedirs = wrap(LocalFileSystem.makedirs)
_rmdir = wrap(LocalFileSystem.rmdir)
_rm_file = wrap(LocalFileSystem.rm_file)

async def _ls(self, path, detail=True, **kwargs):
path = self._strip_protocol(path)
if detail:
entries = await aiofiles.os.scandir(path)
return [await self._info(f) for f in entries]
return [os.path.join(path, f) for f in await aiofiles.os.listdir(path)]

async def _rm_file(self, path, **kwargs):
await aiofiles.os.remove(path)

async def _rmdir(self, path):
await aiofiles.os.rmdir(path)
with await aiofiles.os.scandir(path) as entries:
return [await self._info(f) for f in entries]
return [
posixpath.join(path, f) for f in await aiofiles.os.listdir(path)
]

async def _mkdir(self, path, create_parents=True, **kwargs):
if create_parents:
Expand All @@ -78,11 +54,9 @@ async def _mkdir(self, path, create_parents=True, **kwargs):
errno.EEXIST, os.strerror(errno.EEXIST), path
)
return await self._makedirs(path, exist_ok=True)
path = self._strip_protocol(path)
await aiofiles.os.mkdir(path)

async def _makedirs(self, path, exist_ok=False):
await aiofiles.os.makedirs(path, exist_ok=exist_ok)

async def _cat_file(self, path, start=None, end=None, **kwargs):
async with self.open_async(path, "rb") as f:
if start is not None:
Expand All @@ -100,76 +74,71 @@ async def _pipe_file(self, path, value, **kwargs):
async with self.open_async(path, "wb") as f:
await f.write(value)

async def _put_file(self, path1, path2, **kwargs):
await self._cp_file(path1, path2, **kwargs)

async def _get_file(self, path1, path2, **kwargs):
async def _get_file( # pylint: disable=arguments-renamed
self, path1, path2, **kwargs
):
write_method = getattr(path2, "write", None)
if not write_method:
return await self._cp_file(path1, path2, **kwargs)
if isinstance(
path2, AbstractBufferedFile
path2, AbstractAsyncStreamedFile
) or asyncio.iscoroutinefunction(write_method):
async with self.open_async(path1, "rb") as fsrc:
return await async_copy_to_fobj(fsrc, path2)
return await async_copy_to_fobj(path1, path2)
return await copy_asyncfileobj(fsrc, path2)

path1 = self._strip_protocol(path1)
return await async_copy_to_fobj(self, path1, path2)

async def _cp_file(self, path1, path2, **kwargs):
path1 = self._strip_protocol(path1)
path2 = self._strip_protocol(path2)
if await self._isfile(path1):
return await async_copyfile(path1, path2)
if await self._isdir(path1):
return await self._makedirs(path2, exist_ok=True)
raise FileNotFoundError

async def _mv_file(self, path1, path2, **kwargs):
await async_move(path1, path2)

async def _lexists(self, path, **kwargs):
return await aiofiles.os.path.lexists(path)
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), path1)

async def _created(self, path):
info = await self._info(path=path)
return datetime.datetime.utcfromtimestamp(info["created"])

async def _modified(self, path):
info = await self._info(path=path)
return datetime.datetime.utcfromtimestamp(info["mtime"])
_put_file = _cp_file

async def _rm(
self, path, recursive=False, maxdepth=None
): # pylint: disable=arguments-differ, unused-argument
if isinstance(path, str):
self, path, recursive=False, batch_size=None, maxdepth=None, **kwargs
):
if isinstance(path, (str, os.PathLike)):
path = [path]

assert not maxdepth and not batch_size
for p in path:
p = self._strip_protocol(p)
if recursive and await self._isdir(p):
if os.path.abspath(p) == os.getcwd():
raise ValueError("Cannot delete current working directory")
await async_rmtree(p)
else:
await aiofiles.os.remove(p)

async def _chmod(self, path, mode):
await aiofiles.os.chmod(path, mode)

async def _link(self, src, dst):
src = self._strip_protocol(src)
dst = self._strip_protocol(dst)
await aiofiles.os.link(src, dst)

async def _symlink(self, src, dst):
src = self._strip_protocol(src)
dst = self._strip_protocol(dst)
await aiofiles.os.symlink(src, dst)

async def _islink(self, path):
path = self._strip_protocol(path)
return await aiofiles.os.path.islink(path)

async def _touch(self, path, **kwargs):
if self._exists(path):
if await self._exists(path):
path = self._strip_protocol(path)
return await aiofiles.os.utime(path, None)
async with self.open_async(path, "a"):
pass

_open = LocalFileSystem._open # pylint: disable=protected-access

def open_async( # pylint: disable=invalid-overridden-method
def open_async(
self, path, mode="rb", **kwargs
):
): # pylint: disable=invalid-overridden-method
path = self._strip_protocol(path)
return aiofile.async_open(path, mode, **kwargs)
Loading