Skip to content

Commit

Permalink
Use fsencoded paths on *nix PyFilesystem#120
Browse files Browse the repository at this point in the history
The approach is that unicode is used everywhere unless when on *nix
and that real access to files is needed. In this case the patch is
encoded to bytes using the filesystem encoding.

Signed-off-by: Philippe Ombredanne <[email protected]>
  • Loading branch information
pombredanne committed Dec 20, 2017
1 parent ce6ae08 commit 8f13427
Showing 1 changed file with 92 additions and 40 deletions.
132 changes: 92 additions & 40 deletions fs/osfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import os
import platform
import stat
import sys

import six

Expand All @@ -41,8 +40,11 @@

log = logging.getLogger('fs.osfs')


_WINDOWS_PLATFORM = platform.system() == 'Windows'
ps = platform.system()
_WINDOWS_PLATFORM = ps == 'Windows'
_MAC_PLATFORM = ps == 'Darwin'
_NIX_PLATFORM = ps != _WINDOWS_PLATFORM and ps != _MAC_PLATFORM
del ps


@six.python_2_unicode_compatible
Expand Down Expand Up @@ -77,23 +79,27 @@ def __init__(self,
"""Create an OSFS instance.
"""
super(OSFS, self).__init__()
root_path = fsdecode(fspath(root_path))
_root_path = os.path.expanduser(os.path.expandvars(root_path))
_root_path = os.path.normpath(os.path.abspath(_root_path))
self.root_path = _root_path
_root_path_native = fsencode(fspath(root_path))
_root_path_native = os.path.expandvars(_root_path_native)
_root_path_native = os.path.expanduser(_root_path_native)
_root_path_native = os.path.abspath(_root_path_native)
_root_path_native = os.path.normpath(_root_path_native)

self.root_path_native = _root_path_native
self.root_path = fsdecode(_root_path_native)

if create:
try:
if not os.path.isdir(_root_path):
os.makedirs(_root_path, mode=create_mode)
if not os.path.isdir(_root_path_native):
os.makedirs(_root_path_native, mode=create_mode)
except OSError as error:
raise errors.CreateFailed(
'unable to create {} ({})'.format(root_path, error)
)
else:
if not os.path.isdir(_root_path):
if not os.path.isdir(_root_path_native):
raise errors.CreateFailed(
'root path does not exist'
'root path does not exist or is file'
)

_meta = self._meta = {
Expand All @@ -107,15 +113,15 @@ def __init__(self,
}

if _WINDOWS_PLATFORM: # pragma: nocover
_meta["invalid_path_chars"] =\
_meta["invalid_path_chars"] = \
''.join(six.unichr(n) for n in range(31)) + '\\:*?"<>|'
else:
_meta["invalid_path_chars"] = '\0'

if 'PC_PATH_MAX' in os.pathconf_names:
_meta['max_sys_path_length'] = (
os.pathconf(
fsencode(_root_path),
_root_path_native,
os.pathconf_names['PC_PATH_MAX']
)
)
Expand All @@ -130,13 +136,32 @@ def __str__(self):
return fmt.format(self.__class__.__name__.lower(),
self.root_path)

def _to_sys_path(self, path):
def _to_sys_path(self, path, as_bytes=False):
"""Convert a FS path to a path on the OS.
If `as_bytes` is True, return bytes on *nix and unicode elsewhere.
"""
root_path = self.root_path
sep = '/'
os_sep = os.sep

if _NIX_PLATFORM:
root_path = self.root_path_native
path = path and fsencode(path) or path
sep = six.binary_type(sep)
os_sep = six.binary_type(os_sep)

sys_path = os.path.join(
self.root_path,
path.lstrip('/').replace('/', os.sep)
root_path,
path.lstrip(sep).replace(sep, os_sep)
)
if as_bytes:
if _NIX_PLATFORM:
return sys_path
else:
return fsencode(sys_path)

if _NIX_PLATFORM:
return fsdecode(sys_path)
return sys_path

@classmethod
Expand Down Expand Up @@ -209,25 +234,30 @@ def _get_type_from_stat(cls, _stat):
# --------------------------------------------------------

def _gettarget(self, sys_path):
if _NIX_PLATFORM:
sys_path = fsencode(sys_path)
try:
target = os.readlink(sys_path)
except OSError:
return None
else:
if _NIX_PLATFORM and target:
target = fsdecode(target)
return target

def _make_link_info(self, sys_path):
_target = self._gettarget(sys_path)
link = {
'target': _target,
'target': _target and fsdecode(_target) or _target,
}
return link

def getinfo(self, path, namespaces=None):
self.check()
namespaces = namespaces or ()
path = path and fsdecode(path) or path
_path = self.validatepath(path)
sys_path = self.getsyspath(_path)
sys_path = self._to_sys_path(_path, as_bytes=_NIX_PLATFORM)
_lstat = None
with convert_os_errors('getinfo', path):
_stat = os.stat(sys_path)
Expand Down Expand Up @@ -261,17 +291,19 @@ def getinfo(self, path, namespaces=None):

def listdir(self, path):
self.check()
path = path and fsdecode(path) or path
_path = self.validatepath(path)
sys_path = self._to_sys_path(_path)
sys_path = self._to_sys_path(_path, as_bytes=_NIX_PLATFORM)
with convert_os_errors('listdir', path, directory=True):
names = os.listdir(sys_path)
names = [fsdecode(f) for f in os.listdir(sys_path)]
return names

def makedir(self, path, permissions=None, recreate=False):
self.check()
mode = Permissions.get_mode(permissions)
path = path and fsdecode(path) or path
_path = self.validatepath(path)
sys_path = self._to_sys_path(_path)
sys_path = self._to_sys_path(_path, as_bytes=_NIX_PLATFORM)
with convert_os_errors('makedir', path, directory=True):
try:
os.mkdir(sys_path, mode)
Expand All @@ -288,8 +320,9 @@ def openbin(self, path, mode="r", buffering=-1, **options):
_mode = Mode(mode)
_mode.validate_bin()
self.check()
path = path and fsdecode(path) or path
_path = self.validatepath(path)
sys_path = self._to_sys_path(_path)
sys_path = self._to_sys_path(_path, as_bytes=_NIX_PLATFORM)
with convert_os_errors('openbin', path):
if six.PY2 and _mode.exclusive and self.exists(path):
raise errors.FileExists(path)
Expand All @@ -303,56 +336,66 @@ def openbin(self, path, mode="r", buffering=-1, **options):

def remove(self, path):
self.check()
path = path and fsdecode(path) or path
_path = self.validatepath(path)
sys_path = self._to_sys_path(_path)
sys_path = self._to_sys_path(_path, as_bytes=_NIX_PLATFORM)
with convert_os_errors('remove', path):
try:
os.remove(sys_path)
except OSError as error:
if error.errno == errno.EACCES and sys.platform == "win32":
if error.errno == errno.EACCES and _WINDOWS_PLATFORM:
# sometimes windows says this for attempts to remove a dir
if os.path.isdir(sys_path): # pragma: nocover
raise errors.FileExpected(path)
if error.errno == errno.EPERM and sys.platform == "darwin":
if error.errno == errno.EPERM and _MAC_PLATFORM:
# sometimes OSX says this for attempts to remove a dir
if os.path.isdir(sys_path): # pragma: nocover
raise errors.FileExpected(path)
raise

def removedir(self, path):
self.check()
path = path and fsdecode(path) or path
_path = self.validatepath(path)
if _path == '/':
raise errors.RemoveRootError()
sys_path = self._to_sys_path(path)
sys_path = self._to_sys_path(path, as_bytes=_NIX_PLATFORM)
with convert_os_errors('removedir', path, directory=True):
os.rmdir(sys_path)

# --------------------------------------------------------
# Optional Methods
# --------------------------------------------------------

def getsyspath(self, path):
sys_path = self._to_sys_path(path)
def getsyspath(self, path, as_bytes=False):
path = path and fsdecode(path) or path
sys_path = self._to_sys_path(path, as_bytes=as_bytes)
return sys_path

def geturl(self, path, purpose='download'):
if purpose != 'download':
raise NoURL(path, purpose)
path = path and fsdecode(path) or path
syspath = self.getsyspath(path, as_bytes=_NIX_PLATFORM)
if _NIX_PLATFORM:
syspath = fsdecode(syspath)
# FIXME: segments might need to be URL/percent-encoded instead
return "file://" + self.getsyspath(path)

def gettype(self, path):
self.check()
sys_path = self._to_sys_path(path)
path = path and fsdecode(path) or path
sys_path = self._to_sys_path(path, as_bytes=_NIX_PLATFORM)
with convert_os_errors('gettype', path):
stat = os.stat(sys_path)
resource_type = self._get_type_from_stat(stat)
return resource_type

def islink(self, path):
self.check()
path = path and fsdecode(path) or path
_path = self.validatepath(path)
sys_path = self._to_sys_path(_path)
sys_path = self._to_sys_path(_path, as_bytes=_NIX_PLATFORM)
if not self.exists(path):
raise errors.ResourceNotFound(path)
with convert_os_errors('islink', path):
Expand All @@ -370,8 +413,9 @@ def open(self,
_mode = Mode(mode)
validate_open_mode(mode)
self.check()
path = path and fsdecode(path) or path
_path = self.validatepath(path)
sys_path = self._to_sys_path(_path)
sys_path = self._to_sys_path(_path, as_bytes=_NIX_PLATFORM)
with convert_os_errors('open', path):
if six.PY2 and _mode.exclusive and self.exists(path):
raise FileExists(path)
Expand All @@ -388,8 +432,9 @@ def open(self,

def setinfo(self, path, info):
self.check()
path = path and fsdecode(path) or path
_path = self.validatepath(path)
sys_path = self._to_sys_path(_path)
sys_path = self._to_sys_path(_path, as_bytes=_NIX_PLATFORM)
if not os.path.exists(sys_path):
raise errors.ResourceNotFound(path)
if 'details' in info:
Expand All @@ -407,13 +452,14 @@ def setinfo(self, path, info):
def _scandir(self, path, namespaces=None):
self.check()
namespaces = namespaces or ()
path = path and fsdecode(path) or path
_path = self.validatepath(path)
sys_path = self._to_sys_path(_path)
sys_path = self._to_sys_path(_path, as_bytes=_NIX_PLATFORM)
with convert_os_errors('scandir', path, directory=True):
for dir_entry in scandir(sys_path):
info = {
"basic": {
"name": dir_entry.name,
"name": fsdecode(dir_entry.name),
"is_dir": dir_entry.is_dir()
}
}
Expand All @@ -434,12 +480,15 @@ def _scandir(self, path, namespaces=None):
for k in dir(lstat_result) if k.startswith('st_')
}
if 'link' in namespaces:
dir_entry_name = dir_entry.name
if _NIX_PLATFORM:
dir_entry_name = fsencode(dir_entry_name)
info['link'] = self._make_link_info(
os.path.join(sys_path, dir_entry.name)
fsdecode(os.path.join(sys_path, dir_entry_name))
)
if 'access' in namespaces:
stat_result = dir_entry.stat()
info['access'] =\
info['access'] = \
self._make_access_from_stat(stat_result)

yield Info(info)
Expand All @@ -449,15 +498,17 @@ def _scandir(self, path, namespaces=None):
def _scandir(self, path, namespaces=None):
self.check()
namespaces = namespaces or ()
path = path and fsdecode(path) or path
_path = self.validatepath(path)
sys_path = self._to_sys_path(_path)
sys_path = self._to_sys_path(_path, as_bytes=_NIX_PLATFORM)
with convert_os_errors('scandir', path, directory=True):
for entry_name in os.listdir(sys_path):
entry_name_u = fsdecode(entry_name)
entry_path = os.path.join(sys_path, entry_name)
stat_result = os.stat(entry_path)
info = {
"basic": {
"name": entry_name,
"name": entry_name_u,
"is_dir": stat.S_ISDIR(stat_result.st_mode),
}
}
Expand All @@ -477,16 +528,17 @@ def _scandir(self, path, namespaces=None):
}
if 'link' in namespaces:
info['link'] = self._make_link_info(
os.path.join(sys_path, entry_name)
fsdecode(os.path.join(sys_path, entry_name))
)
if 'access' in namespaces:
info['access'] =\
info['access'] = \
self._make_access_from_stat(stat_result)

yield Info(info)


def scandir(self, path, namespaces=None, page=None):
path = path and fsdecode(path) or path
iter_info = self._scandir(path, namespaces=namespaces)
if page is not None:
start, end = page
Expand Down

0 comments on commit 8f13427

Please sign in to comment.