Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add concatenated memmap array cache #16

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ ENV/
exp_local
outputs
data
tmp

# adding output from unit-tests here
*-raw.fif
Expand Down
166 changes: 117 additions & 49 deletions exca/cachedict.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"""
Disk, RAM caches
"""
import hashlib
import json
import logging
import shutil
import subprocess
Expand All @@ -16,23 +16,14 @@
from pathlib import Path

from . import utils
from .dumperloader import DumperLoader
from .dumperloader import DumperLoader, StaticDumperLoader, _string_uid, host_pid

X = tp.TypeVar("X")
Y = tp.TypeVar("Y")

UNSAFE_TABLE = {ord(char): "-" for char in "/\\\n\t "}
logger = logging.getLogger(__name__)


def _string_uid(string: str) -> str:
out = string.translate(UNSAFE_TABLE)
if len(out) > 80:
out = out[:40] + "[.]" + out[-40:]
h = hashlib.md5(string.encode("utf8")).hexdigest()[:8]
return f"{out}-{h}"


class CacheDict(tp.Generic[X]):
"""Dictionary-like object that caches and loads data on disk and ram.

Expand Down Expand Up @@ -66,8 +57,9 @@ class CacheDict(tp.Generic[X]):

Note
----
Each item is cached as 1 file, with an additional .key file with the same name holding
the actual key for the item (which can differ from the file name)
Dicts write to .jsonl files to hold keys and how to read the
corresponding item. Different threads write to different jsonl
files to avoid interferences.
"""

def __init__(
Expand All @@ -76,10 +68,13 @@ def __init__(
keep_in_ram: bool = False,
cache_type: None | str = None,
permissions: int | None = 0o777,
_write_legacy_key_files: bool = True,
) -> None:
self.folder = None if folder is None else Path(folder)
self.permissions = permissions
self._keep_in_ram = keep_in_ram
self._write_legacy_key_files = _write_legacy_key_files
self._loaders: dict[str, DumperLoader] = {} # loaders are persistent
if self.folder is None and not keep_in_ram:
raise ValueError("At least folder or keep_in_ram should be activated")

Expand All @@ -91,10 +86,14 @@ def __init__(
except Exception as e:
msg = f"Failed to set permission to {self.permissions} on {self.folder}\n({e})"
logger.warning(msg)
self._ram_data: tp.Dict[str, X] = {}
self._key_uid: tp.Dict[str, str] = {}
self._ram_data: dict[str, X] = {}
self._key_info: dict[str, dict[str, tp.Any]] = {}
self.cache_type = cache_type
self._set_cache_type(cache_type)
self._informed_size: int | None = None

def inform_size(self, size: int) -> None:
self._informed_size = size

def __repr__(self) -> str:
name = self.__class__.__name__
Expand All @@ -103,7 +102,7 @@ def __repr__(self) -> str:

def clear(self) -> None:
self._ram_data.clear()
self._key_uid.clear()
self._key_info.clear()
if self.folder is not None:
# let's remove content but not the folder to keep same permissions
for sub in self.folder.iterdir():
Expand All @@ -118,26 +117,52 @@ def __len__(self) -> int:
def keys(self) -> tp.Iterator[str]:
keys = set(self._ram_data)
if self.folder is not None:
folder = Path(self.folder)
# read all existing key files as fast as possible (pathlib.glob is slow)
find_cmd = 'find . -type f -name "*.key"'
try:
out = subprocess.check_output(
'find . -type f -name "*.key"', shell=True, cwd=self.folder
).decode("utf8")
out = subprocess.check_output(find_cmd, shell=True, cwd=folder)
except subprocess.CalledProcessError as e:
out = e.output.decode("utf8") # stderr contains missing tmp files
names = out.splitlines()
out = e.output
names = out.decode("utf8").splitlines()
jobs = {}
# parallelize content reading
with futures.ThreadPoolExecutor() as ex:
jobs = {
name[:-4]: ex.submit((self.folder / name).read_text, "utf8")
name[:-4]: ex.submit((folder / name).read_text, "utf8")
for name in names
if name[:-4] not in self._key_uid
if name[:-4] not in self._key_info
}
self._key_uid.update({j.result(): name for name, j in jobs.items()})
keys |= set(self._key_uid)
self._key_info.update({j.result(): {"uid": name} for name, j in jobs.items()})
self._load_info_files()
keys |= set(self._key_info)
return iter(keys)

def _load_info_files(self) -> None:
if self.folder is None:
return
folder = Path(self.folder)
# read all existing jsonl files
find_cmd = 'find . -type f -name "*-info.jsonl"'
try:
out = subprocess.check_output(find_cmd, shell=True, cwd=folder)
except subprocess.CalledProcessError as e:
out = e.output # stderr contains missing tmp files
names = out.decode("utf8").splitlines()
for name in names:
fp = folder / name
num = 0
with fp.open("rb") as f:
for line in f:
count = len(line)
line = line.strip()
if not line:
continue
info = json.loads(line.decode("utf8"))
info.update(_jsonl=fp, _byterange=(num, num + count))
self._key_info[info.pop("_key")] = info
num += count

def values(self) -> tp.Iterable[X]:
for key in self:
yield self[key]
Expand All @@ -156,9 +181,9 @@ def __getitem__(self, key: str) -> X:
# necessarily in file cache folder from now on
if self.folder is None:
raise RuntimeError("This should not happen")
if key not in self._key_uid:
if key not in self._key_info:
_ = key in self
if key not in self._key_uid:
if key not in self._key_info:
# trigger folder cache update:
# https://stackoverflow.com/questions/3112546/os-path-exists-lies/3112717
self.folder.chmod(self.folder.stat().st_mode)
Expand All @@ -167,13 +192,27 @@ def __getitem__(self, key: str) -> X:
self.check_cache_type()
if self.cache_type is None:
raise RuntimeError(f"Could not figure cache_type in {self.folder}")
uid = self._key_uid[key]
loader = DumperLoader.CLASSES[self.cache_type]
loaded = loader.load(self.folder / uid)
info = self._key_info[key]
loader = self._get_loader()
loaded = loader.load(**{x: y for x, y in info.items() if not x.startswith("_")})
if self._keep_in_ram:
self._ram_data[key] = loaded # type: ignore
self._ram_data[key] = loaded
return loaded # type: ignore

def _get_loader(self) -> DumperLoader:
key = host_pid() # make sure we dont use a loader from another thread
if self.folder is None:
raise RuntimeError("Cannot get loader with no folder")
if self.cache_type is None:
raise RuntimeError("Shouldn't get called with no cache type")
if key not in self._loaders:
self._loaders[key] = DumperLoader.CLASSES[self.cache_type](self.folder)
if self._informed_size is not None:
if hasattr(self._loaders[key], "size"): # HACKY!
self._loaders[key].size = self._informed_size # type: ignore
self._informed_size = None
return self._loaders[key]

def _set_cache_type(self, cache_type: str | None) -> None:
if self.folder is None:
return # not needed
Expand Down Expand Up @@ -208,25 +247,37 @@ def __setitem__(self, key: str, value: X) -> None:
if self._keep_in_ram and self.folder is None:
self._ram_data[key] = value
if self.folder is not None:
uid = _string_uid(key) # use a safe mapping
self._key_uid[key] = uid
dumper = DumperLoader.CLASSES[self.cache_type]()
dumper.dump(self.folder / uid, value)
dumpfile = dumper.filepath(self.folder / uid)
keyfile = self.folder / (uid + ".key")
keyfile.write_text(key, encoding="utf8")
dumper = self._get_loader()
info = dumper.dump(key, value)
files = [self.folder / info["filename"]]
if self._write_legacy_key_files and isinstance(dumper, StaticDumperLoader):
keyfile = self.folder / (info["filename"][: -len(dumper.SUFFIX)] + ".key")
keyfile.write_text(key, encoding="utf8")
files.append(keyfile)
# new write
info["_key"] = key
info_fp = Path(self.folder) / f"{host_pid()}-info.jsonl"
if not info_fp.exists():
# no need to update the file permission if pre-existing
files.append(info_fp)
with info_fp.open("ab") as f:
b = json.dumps(info).encode("utf8")
current = f.tell()
f.write(b + b"\n")
info.update(_jsonl=info_fp, _byterange=(current, current + len(b) + 1))
self._key_info[key] = info
# reading will reload to in-memory cache if need be
# (since dumping may have loaded the underlying data, let's not keep it)
if self.permissions is not None:
for fp in [dumpfile, keyfile]:
for fp in files:
try:
fp.chmod(self.permissions)
except Exception: # pylint: disable=broad-except
pass # avoid issues in case of overlapping processes

def __delitem__(self, key: str) -> None:
# necessarily in file cache folder from now on
if key not in self._key_uid:
if key not in self._key_info:
_ = key in self
self._ram_data.pop(key, None)
if self.folder is None:
Expand All @@ -235,26 +286,43 @@ def __delitem__(self, key: str) -> None:
self.check_cache_type()
if self.cache_type is None:
raise RuntimeError(f"Could not figure cache_type in {self.folder}")
uid = self._key_uid.pop(key)
keyfile = self.folder / (uid + ".key")
keyfile.unlink()
dumper = DumperLoader.CLASSES[self.cache_type]()
fp = dumper.filepath(self.folder / uid)
with utils.fast_unlink(fp): # moves then delete to avoid weird effects
pass
loader = self._get_loader()
info = self._key_info.pop(key)
if isinstance(loader, StaticDumperLoader):
keyfile = self.folder / (info["filename"][: -len(loader.SUFFIX)] + ".key")
keyfile.unlink(missing_ok=True)
if "_jsonl" in info:
jsonl = Path(info["_jsonl"])
brange = info["_byterange"]
# overwrite with whitespaces
with jsonl.open("rb+") as f:
f.seek(brange[0])
f.write(b" " * (brange[1] - brange[0] - 1) + b"\n")
info = {x: y for x, y in info.items() if not x.startswith("_")}
if len(info) == 1: # only filename -> we can remove it as it is not shared
# moves then delete to avoid weird effects
with utils.fast_unlink(Path(self.folder) / info["filename"]):
pass

def __contains__(self, key: str) -> bool:
# in-memory cache
if key in self._ram_data:
return True
self._load_info_files()
if self.folder is not None:
# in folder (already read once)
if key in self._key_uid:
if key in self._key_info:
return True
# maybe in folder (never read it)
uid = _string_uid(key)
fp = self.folder / f"{uid}.key"
if fp.exists():
self._key_uid[key] = uid
loader = self._get_loader()
if not isinstance(loader, StaticDumperLoader):
raise RuntimeError(
"Cannot regenerate info from non-static dumper-loader"
)
filename = _string_uid(key) + loader.SUFFIX
self._key_info[key] = {"filename": filename}
return True
return False # lazy check
Loading
Loading