Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Experiment with write context for efficiency #18

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ ENV/
exp_local
outputs
data
tmp

# adding output from unit-tests here
*-raw.fif
Expand Down
241 changes: 179 additions & 62 deletions exca/cachedict.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
"""
Disk, RAM caches
"""
import hashlib
import contextlib
import dataclasses
import io
import json
import logging
import shutil
import subprocess
Expand All @@ -16,21 +19,19 @@
from pathlib import Path

from . import utils
from .dumperloader import DumperLoader
from .dumperloader import DumperLoader, StaticDumperLoader, _string_uid, host_pid

X = tp.TypeVar("X")
Y = tp.TypeVar("Y")

UNSAFE_TABLE = {ord(char): "-" for char in "/\\\n\t "}
logger = logging.getLogger(__name__)


def _string_uid(string: str) -> str:
out = string.translate(UNSAFE_TABLE)
if len(out) > 80:
out = out[:40] + "[.]" + out[-40:]
h = hashlib.md5(string.encode("utf8")).hexdigest()[:8]
return f"{out}-{h}"
@dataclasses.dataclass
class DumpInfo:
byte_range: tuple[int, int]
jsonl: Path
cache_type: str


class CacheDict(tp.Generic[X]):
Expand Down Expand Up @@ -66,8 +67,9 @@ class CacheDict(tp.Generic[X]):

Note
----
Each item is cached as 1 file, with an additional .key file with the same name holding
the actual key for the item (which can differ from the file name)
Dicts write to .jsonl files to hold keys and how to read the
corresponding item. Different threads write to different jsonl
files to avoid interferences.
"""

def __init__(
Expand All @@ -76,10 +78,13 @@ def __init__(
keep_in_ram: bool = False,
cache_type: None | str = None,
permissions: int | None = 0o777,
_write_legacy_key_files: bool = False,
) -> None:
self.folder = None if folder is None else Path(folder)
self.permissions = permissions
self._keep_in_ram = keep_in_ram
self._write_legacy_key_files = _write_legacy_key_files
self._loaders: dict[tuple[str, str], DumperLoader] = {} # loaders are persistent
if self.folder is None and not keep_in_ram:
raise ValueError("At least folder or keep_in_ram should be activated")

Expand All @@ -91,10 +96,15 @@ def __init__(
except Exception as e:
msg = f"Failed to set permission to {self.permissions} on {self.folder}\n({e})"
logger.warning(msg)
self._ram_data: tp.Dict[str, X] = {}
self._key_uid: tp.Dict[str, str] = {}
self._ram_data: dict[str, X] = {}
self._key_info: dict[str, dict[str, tp.Any]] = {}
self.cache_type = cache_type
self._set_cache_type(cache_type)
# write mode
self._info_f: io.BufferedWriter | None = None
self._info_fp: Path | None = None
self._estack: contextlib.ExitStack | None = None
self._dumper: DumperLoader | None = None

def __repr__(self) -> str:
name = self.__class__.__name__
Expand All @@ -103,7 +113,7 @@ def __repr__(self) -> str:

def clear(self) -> None:
self._ram_data.clear()
self._key_uid.clear()
self._key_info.clear()
if self.folder is not None:
# let's remove content but not the folder to keep same permissions
for sub in self.folder.iterdir():
Expand All @@ -118,26 +128,73 @@ def __len__(self) -> int:
def keys(self) -> tp.Iterator[str]:
keys = set(self._ram_data)
if self.folder is not None:
folder = Path(self.folder)
# read all existing key files as fast as possible (pathlib.glob is slow)
find_cmd = 'find . -type f -name "*.key"'
try:
out = subprocess.check_output(
'find . -type f -name "*.key"', shell=True, cwd=self.folder
).decode("utf8")
out = subprocess.check_output(find_cmd, shell=True, cwd=folder)
except subprocess.CalledProcessError as e:
out = e.output.decode("utf8") # stderr contains missing tmp files
names = out.splitlines()
out = e.output
names = out.decode("utf8").splitlines()
jobs = {}
if self.cache_type is None:
self._set_cache_type(None)
if names and self.cache_type is None:
raise RuntimeError("cache_type should have been detected")
# parallelize content reading
with futures.ThreadPoolExecutor() as ex:
jobs = {
name[:-4]: ex.submit((self.folder / name).read_text, "utf8")
name[:-4]: ex.submit((folder / name).read_text, "utf8")
for name in names
if name[:-4] not in self._key_uid
if name[:-4] not in self._key_info
}
self._key_uid.update({j.result(): name for name, j in jobs.items()})
keys |= set(self._key_uid)
info = {
j.result(): {
"uid": name,
"_dump_info": DumpInfo(
byte_range=(0, 0),
jsonl=folder / (name + ".key"),
cache_type=self.cache_type, # type: ignore
),
}
for name, j in jobs.items()
}
self._key_info.update(info)
self._load_info_files()
keys |= set(self._key_info)
return iter(keys)

def _load_info_files(self) -> None:
if self.folder is None:
return
folder = Path(self.folder)
# read all existing jsonl files
find_cmd = 'find . -type f -name "*-info.jsonl"'
try:
print(find_cmd, folder)
out = subprocess.check_output(find_cmd, shell=True, cwd=folder)
except subprocess.CalledProcessError as e:
out = e.output # stderr contains missing tmp files
names = out.decode("utf8").splitlines()
for name in names:
fp = folder / name
last = 0
meta = {}
with fp.open("rb") as f:
for k, line in enumerate(f):
count = len(line)
last = last + count
line = line.strip()
if not line:
continue
info = json.loads(line.decode("utf8"))
if not k: # metadata
meta = info
continue
dinfo = DumpInfo(jsonl=fp, byte_range=(last - count, last), **meta)
info["_dump_info"] = dinfo
self._key_info[info.pop("_key")] = info

def values(self) -> tp.Iterable[X]:
for key in self:
yield self[key]
Expand All @@ -156,105 +213,165 @@ def __getitem__(self, key: str) -> X:
# necessarily in file cache folder from now on
if self.folder is None:
raise RuntimeError("This should not happen")
if key not in self._key_uid:
if key not in self._key_info:
_ = key in self
if key not in self._key_uid:
if key not in self._key_info:
# trigger folder cache update:
# https://stackoverflow.com/questions/3112546/os-path-exists-lies/3112717
self.folder.chmod(self.folder.stat().st_mode)
_ = key in self
if self.cache_type is None:
self.check_cache_type()
if self.cache_type is None:
raise RuntimeError(f"Could not figure cache_type in {self.folder}")
uid = self._key_uid[key]
loader = DumperLoader.CLASSES[self.cache_type]
loaded = loader.load(self.folder / uid)
info = self._key_info[key]
dinfo: DumpInfo = info["_dump_info"]
loader = self._get_loader(dinfo.cache_type)
loaded = loader.load(**{x: y for x, y in info.items() if not x.startswith("_")})
if self._keep_in_ram:
self._ram_data[key] = loaded # type: ignore
self._ram_data[key] = loaded
return loaded # type: ignore

def _get_loader(self, cache_type: str) -> DumperLoader:
key = (
cache_type,
host_pid(),
) # make sure we dont use a loader from another thread
if self.folder is None:
raise RuntimeError("Cannot get loader with no folder")
if key not in self._loaders:
self._loaders[key] = DumperLoader.CLASSES[cache_type](self.folder)
return self._loaders[key]

def _set_cache_type(self, cache_type: str | None) -> None:
if self.folder is None:
return # not needed
fp = self.folder / ".cache_type"
if cache_type is None:
if fp.exists():
cache_type = fp.read_text()
if cache_type not in DumperLoader.CLASSES:
logger.warning("Ignoring cache_type file providing: %s", cache_type)
cache_type = None
self.check_cache_type(cache_type)
if cache_type is not None:
DumperLoader.check_valid_cache_type(cache_type)
self.cache_type = cache_type
if not fp.exists():
self.folder.mkdir(exist_ok=True)
fp.write_text(cache_type)
if self.permissions is not None:
fp.chmod(self.permissions)

@staticmethod
def check_cache_type(cache_type: None | str = None) -> None:
if cache_type is not None:
if cache_type not in DumperLoader.CLASSES:
avail = list(DumperLoader.CLASSES)
raise ValueError(f"Unknown {cache_type=}, use one of {avail}")
@contextlib.contextmanager
def write_mode(self) -> tp.Iterator[None]:
if self.folder is None:
yield
return
info_fp = Path(self.folder) / f"{host_pid()}-info.jsonl"
try:
with contextlib.ExitStack() as estack:
f = estack.enter_context(info_fp.open("ab"))
self._estack = estack
self._info_f = f
self._info_fp = info_fp
yield
except:
pass
finally:
self._info_f = None
self._estack = None
self._info_fp = None

def __setitem__(self, key: str, value: X) -> None:
if self.cache_type is None:
cls = DumperLoader.default_class(type(value))
self.cache_type = cls.__name__
self._set_cache_type(self.cache_type)

if self._keep_in_ram and self.folder is None:
self._ram_data[key] = value
if self.folder is not None:
uid = _string_uid(key) # use a safe mapping
self._key_uid[key] = uid
dumper = DumperLoader.CLASSES[self.cache_type]()
dumper.dump(self.folder / uid, value)
dumpfile = dumper.filepath(self.folder / uid)
keyfile = self.folder / (uid + ".key")
keyfile.write_text(key, encoding="utf8")
if self._info_f is None or self._estack is None or self._info_fp is None:
raise RuntimeError("Cannot write without a write_mode context")
if self._dumper is None:
self._dumper = self._get_loader(self.cache_type)
self._estack.enter_context(self._dumper.write_mode())
info = self._dumper.dump(key, value)
files = [self.folder / info["filename"]]
if self._write_legacy_key_files and isinstance(
self._dumper, StaticDumperLoader
):
keyfile = self.folder / (
info["filename"][: -len(self._dumper.SUFFIX)] + ".key"
)
keyfile.write_text(key, encoding="utf8")
files.append(keyfile)
# new write
info["_key"] = key
meta = {"cache_type": self.cache_type}
if not self._info_f.tell():
self._info_f.write(json.dumps(meta).encode("utf8") + b"\n")
b = json.dumps(info).encode("utf8")
current = self._info_f.tell()
self._info_f.write(b + b"\n")
dinfo = DumpInfo(
jsonl=self._info_fp, byte_range=(current, current + len(b) + 1), **meta
)
info["_dump_info"] = dinfo
self._key_info[key] = info
# reading will reload to in-memory cache if need be
# (since dumping may have loaded the underlying data, let's not keep it)
if self.permissions is not None:
for fp in [dumpfile, keyfile]:
for fp in files:
try:
fp.chmod(self.permissions)
except Exception: # pylint: disable=broad-except
pass # avoid issues in case of overlapping processes

def __delitem__(self, key: str) -> None:
# necessarily in file cache folder from now on
if key not in self._key_uid:
if key not in self._key_info:
_ = key in self
self._ram_data.pop(key, None)
if self.folder is None:
return
if self.cache_type is None:
self.check_cache_type()
if self.cache_type is None:
raise RuntimeError(f"Could not figure cache_type in {self.folder}")
uid = self._key_uid.pop(key)
keyfile = self.folder / (uid + ".key")
keyfile.unlink()
dumper = DumperLoader.CLASSES[self.cache_type]()
fp = dumper.filepath(self.folder / uid)
with utils.fast_unlink(fp): # moves then delete to avoid weird effects
pass
info = self._key_info.pop(key)
dinfo: DumpInfo = info["_dump_info"]
loader = self._get_loader(dinfo.cache_type)
if isinstance(loader, StaticDumperLoader): # legacy
keyfile = self.folder / (info["filename"][: -len(loader.SUFFIX)] + ".key")
keyfile.unlink(missing_ok=True)
brange = dinfo.byte_range
if brange[0] != brange[1]:
# overwrite with whitespaces
with dinfo.jsonl.open("rb+") as f:
f.seek(brange[0])
f.write(b" " * (brange[1] - brange[0] - 1))
info = {x: y for x, y in info.items() if not x.startswith("_")}
if len(info) == 1: # only filename -> we can remove it as it is not shared
# moves then delete to avoid weird effects
with utils.fast_unlink(Path(self.folder) / info["filename"]):
pass

def __contains__(self, key: str) -> bool:
# in-memory cache
if key in self._ram_data:
return True
self._load_info_files()
if self.folder is not None:
# in folder (already read once)
if key in self._key_uid:
if key in self._key_info:
return True
# maybe in folder (never read it)
uid = _string_uid(key)
fp = self.folder / f"{uid}.key"
if fp.exists():
self._key_uid[key] = uid
if self.cache_type is None:
self._set_cache_type(None)
if fp.exists() and self.cache_type is not None:
loader = self._get_loader(self.cache_type)
if not isinstance(loader, StaticDumperLoader):
raise RuntimeError(
"Cannot regenerate info from non-static dumper-loader"
)
filename = _string_uid(key) + loader.SUFFIX
dinfo = DumpInfo(byte_range=(0, 0), jsonl=fp, cache_type=self.cache_type)
self._key_info[key] = {"filename": filename, "_dump_info": dinfo}
return True
return False # lazy check
Loading
Loading