diff --git a/Makefile.in b/Makefile.in index b82053d90..77599664a 100644 --- a/Makefile.in +++ b/Makefile.in @@ -46,21 +46,6 @@ fix_shebangs: find $(DESTDIR) -type f -name "*.sh" -exec sed -i '1s|^#!/usr/bin/env sh|#!$(SHELL_INTERPRETER)|' {} \; find $(DESTDIR) -type f -name "umu-run" -exec sed -i '1s|^#!/usr/bin/env sh|#!$(SHELL_INTERPRETER)|' {} \; -# Special case, do this inside the source directory for release distribution -umu/umu_version.json: umu/umu_version.json.in - $(info :: Updating $(@) ) - cp $(<) $(<).tmp - sed 's|##UMU_VERSION##|$(shell git describe --always --long --tags)|g' -i $(<).tmp - mv $(<).tmp $(@) - -.PHONY: version -version: umu/umu_version.json - -version-install: version - $(info :: Installing umu_version.json ) - install -d $(DESTDIR)$(PYTHONDIR)/$(INSTALLDIR) - install -Dm 644 umu/umu_version.json -t $(DESTDIR)$(PYTHONDIR)/$(INSTALLDIR) - $(OBJDIR)/.build-umu-docs: | $(OBJDIR) $(info :: Building umu man pages ) @@ -79,7 +64,7 @@ umu-docs-install: umu-docs install -m644 $(OBJDIR)/umu.5 $(DESTDIR)$(MANDIR)/man5/umu.5 -$(OBJDIR)/.build-umu-dist: | $(OBJDIR) version +$(OBJDIR)/.build-umu-dist: | $(OBJDIR) $(info :: Building umu ) $(PYTHON_INTERPRETER) -m build --wheel --skip-dependency-check --no-isolation --outdir=$(OBJDIR) touch $(@) @@ -93,9 +78,9 @@ umu-dist-install: umu-dist $(PYTHON_INTERPRETER) -m installer --destdir=$(DESTDIR) $(OBJDIR)/*.whl ifeq ($(FLATPAK), xtrue) -umu-install: version-install umu-dist-install +umu-install: umu-dist-install else -umu-install: version-install umu-dist-install umu-docs-install +umu-install: umu-dist-install umu-docs-install endif ifeq ($(FLATPAK), xtrue) @@ -148,7 +133,7 @@ $(OBJDIR): .PHONY: clean clean: $(info :: Cleaning source directory ) - @rm -rf -v $(OBJDIR) umu/umu_version.json ./$(RELEASEDIR) $(RELEASEDIR).tar.gz + @rm -rf -v $(OBJDIR) ./$(RELEASEDIR) $(RELEASEDIR).tar.gz RELEASEDIR := $(PROJECT)-$(shell git describe --abbrev=0) @@ -156,7 +141,7 @@ $(RELEASEDIR): mkdir -p $(@) .PHONY: release -release: $(RELEASEDIR) | version zipapp +release: $(RELEASEDIR) | zipapp $(info :: Creating source distribution for release ) mkdir -p $(<) rm -rf umu/__pycache__ @@ -177,11 +162,10 @@ ZIPAPP := $(OBJDIR)/umu-run ZIPAPP_STAGING := $(OBJDIR)/zipapp_staging ZIPAPP_VENV := $(OBJDIR)/zipapp_venv -$(OBJDIR)/.build-zipapp: | $(OBJDIR) version +$(OBJDIR)/.build-zipapp: | $(OBJDIR) $(info :: Building umu-launcher as zipapp ) $(PYTHON_INTERPRETER) -m venv $(ZIPAPP_VENV) . $(ZIPAPP_VENV)/bin/activate && python3 -m pip install -t "$(ZIPAPP_STAGING)" -U --no-compile . - install -Dm644 umu/umu_version.json "$(ZIPAPP_STAGING)"/umu/umu_version.json cp umu/__main__.py "$(ZIPAPP_STAGING)" find "$(ZIPAPP_STAGING)" -exec touch -h -d "$(SOURCE_DATE_EPOCH)" {} + . $(ZIPAPP_VENV)/bin/activate && python3 -m zipapp $(ZIPAPP_STAGING) -o $(ZIPAPP) -p "$(PYTHON_INTERPRETER)" -c diff --git a/umu/__init__.py b/umu/__init__.py index 520d6cb42..748dd2b6b 100644 --- a/umu/__init__.py +++ b/umu/__init__.py @@ -1 +1,6 @@ __version__ = "1.1.1" # noqa: D104 +__pressure_vessel_runtime__ = ("steamrt3", "sniper", "1628350") +__pressure_vessel_runtimes__ = ( + ("steamrt3", "sniper", "1628350"), + ("steamrt2", "soldier", "1391110"), +) diff --git a/umu/umu_consts.py b/umu/umu_consts.py index a192f315b..1910b84fd 100644 --- a/umu/umu_consts.py +++ b/umu/umu_consts.py @@ -1,8 +1,6 @@ import os from pathlib import Path -CONFIG = "umu_version.json" - STEAM_COMPAT: Path = Path.home().joinpath( ".local", "share", "Steam", "compatibilitytools.d" ) diff --git a/umu/umu_run.py b/umu/umu_run.py index adf3ca3fb..cf3cf1a0b 100755 --- a/umu/umu_run.py +++ b/umu/umu_run.py @@ -38,6 +38,7 @@ from Xlib.protocol.rq import Event from Xlib.xobject.drawable import Window +from umu import __pressure_vessel_runtime__ from umu.umu_consts import ( PR_SET_CHILD_SUBREAPER, PROTON_VERBS, @@ -51,6 +52,8 @@ from umu.umu_util import ( get_libc, get_library_paths, + get_vdf_value, + get_vdfs, is_installed_verb, is_winetricks_verb, xdisplay, @@ -733,6 +736,7 @@ def main() -> int: # noqa: D103 "UMU_RUNTIME_UPDATE": "", } opts: list[str] = [] + runtime_platform: Path = UMU_LOCAL / __pressure_vessel_runtime__[0] prereq: bool = False root: Traversable @@ -804,7 +808,11 @@ def main() -> int: # noqa: D103 # Setup the launcher and runtime files future: Future = thread_pool.submit( - setup_umu, root, UMU_LOCAL, thread_pool + setup_umu, + root, + UMU_LOCAL, + __pressure_vessel_runtime__, + thread_pool, ) if isinstance(args, Namespace): @@ -840,8 +848,51 @@ def main() -> int: # noqa: D103 ): sys.exit(1) + # Get Proton's required compatibility tool in its .vdf file. + # The App ID is the runtime that it's built against + toolappid_proton: str = get_vdf_value( + Path(env["PROTONPATH"], "toolmanifest.vdf"), "require_tool_appid" + ) + has_matching_compat_tools: bool = False + + # Since it's common for people to change Proton versions, verify we're + # using Proton's intended runtime and auto change to it if downloaded + for file in get_vdfs(UMU_LOCAL, "**/steampipe/app_build*.vdf"): + if get_vdf_value(file, "appid") == toolappid_proton: + runtime_platform: Path = file.parent.parent + log.debug("App ID (Proton): %s", toolappid_proton) + log.debug("Steam Linux Runtime: %s", file.parent.parent.name) + log.console( + f"Using {Path(env['PROTONPATH']).name} with " + f"{runtime_platform.name}" + ) + has_matching_compat_tools = True + break + + # Warn if compatibility tools mismatch (e.g., using proton 7 with sniper) + # Assuming our parser is correct, at this point, the user is probably + # trying to use a proton without its required runtime. This may happen if + # we did not bump the runtime in time because, at the time of this writing, + # we're currently assuming the usage of sniper. In this case, just warn + # them and opt not to set an appropriate proton for the user to avoid the + # risk of breaking users' pfxs + if not has_matching_compat_tools: + log.warning("Compatibility tools mismatch") + log.warning( + "'%s' is not a required compatibility tool for '%s'", + runtime_platform, + env["PROTONPATH"], + ) + log.warning( + "Prefix '%s' was created with compatibility tool %s", + env["WINEPREFIX"], + Path(env["PROTONPATH"], "version").read_text(encoding="utf-8"), + ) + # Build the command - command: tuple[Path | str, ...] = build_command(env, UMU_LOCAL, opts) + command: tuple[Path | str, ...] = build_command( + env, runtime_platform, opts + ) log.debug("%s", command) # Run the command @@ -858,7 +909,3 @@ def main() -> int: # noqa: D103 sys.exit(e.code) except BaseException as e: log.exception(e) - finally: - UMU_LOCAL.joinpath(".ref").unlink( - missing_ok=True - ) # Cleanup .ref file on every exit diff --git a/umu/umu_runtime.py b/umu/umu_runtime.py index bd25ca35d..a8a54a463 100644 --- a/umu/umu_runtime.py +++ b/umu/umu_runtime.py @@ -10,20 +10,24 @@ except ModuleNotFoundError: from importlib.abc import Traversable -from json import load from pathlib import Path from secrets import token_urlsafe from shutil import move, rmtree from subprocess import run from tarfile import open as taropen from tempfile import TemporaryDirectory, mkdtemp -from typing import Any from filelock import FileLock -from umu.umu_consts import CONFIG, UMU_CACHE, UMU_LOCAL +from umu import __pressure_vessel_runtimes__ +from umu.umu_consts import UMU_CACHE from umu.umu_log import log -from umu.umu_util import find_obsolete, https_connection, run_zenity +from umu.umu_util import ( + find_obsolete, + get_vdf_value, + https_connection, + run_zenity, +) try: from tarfile import tar_filter @@ -34,24 +38,62 @@ def _install_umu( - json: dict[str, Any], + local: Path, + runtime_platform: tuple[str, str, str], thread_pool: ThreadPoolExecutor, client_session: HTTPSConnection, ) -> None: resp: HTTPResponse + archive: str + base_url: str tmp: Path = Path(mkdtemp()) ret: int = 0 # Exit code from zenity + token: str = f"?versions={token_urlsafe(16)}" + + # When using an existing obsolete proton build, download its intended + # runtime. Handles cases where on a new install, the user or the client + # passes and existing obsolete proton build but its corresponding runtime + # has not been downloaded for them yet. + if _is_obsolete_umu(runtime_platform): + toolmanifest: Path = Path(os.environ["PROTONPATH"], "toolmanifest.vdf") + compat_tool: str = get_vdf_value( + toolmanifest, + "require_tool_appid", + ) + + log.warning( + "%s is obsolete, downloading obsolete steamrt", + toolmanifest.parent.name, + ) + + # Change runtime paths and runtime base platform + for pv_runtime in __pressure_vessel_runtimes__: + if compat_tool in pv_runtime: + log.debug( + "Changing SLR base platform: %s -> %s", + runtime_platform, + pv_runtime, + ) + log.debug( + "Changing base directory: '%s' -> '%s'", + local, + local.parent / pv_runtime[0], + ) + runtime_platform = pv_runtime + local = local.parent / pv_runtime[0] + break + + local.mkdir(parents=True, exist_ok=True) + # Codename for the runtime (e.g., 'sniper') - codename: str = json["umu"]["versions"]["runtime_platform"] # Archive containing the runtime - archive: str = f"SteamLinuxRuntime_{codename}.tar.xz" - base_url: str = ( - f"https://repo.steampowered.com/steamrt-images-{codename}" + archive = f"SteamLinuxRuntime_{runtime_platform[1]}.tar.xz" + base_url = ( + f"https://repo.steampowered.com/steamrt-images-{runtime_platform[1]}" "/snapshots/latest-container-runtime-public-beta" ) - token: str = f"?versions={token_urlsafe(16)}" - log.debug("Codename: %s", codename) + log.debug("Codename: %s", runtime_platform[1]) log.debug("URL: %s", base_url) # Download the runtime and optionally create a popup with zenity @@ -76,7 +118,7 @@ def _install_umu( if not os.environ.get("UMU_ZENITY") or ret: digest: str = "" endpoint: str = ( - f"/steamrt-images-{codename}" + f"/steamrt-images-{runtime_platform[1]}" "/snapshots/latest-container-runtime-public-beta" ) hashsum = sha256() @@ -98,7 +140,9 @@ def _install_umu( break # Download the runtime - log.console(f"Downloading latest steamrt {codename}, please wait...") + log.console( + f"Downloading latest steamrt {runtime_platform[1]}, please wait..." + ) client_session.request("GET", f"{endpoint}/{archive}{token}") with ( @@ -148,9 +192,6 @@ def _install_umu( log.warning("Using no data filter for archive") log.warning("Archive will be extracted insecurely") - # Ensure the target directory exists - UMU_LOCAL.mkdir(parents=True, exist_ok=True) - # Extract the entirety of the archive w/ or w/o the data filter log.debug( "Extracting: %s -> %s", f"{tmpcache}/{archive}", tmpcache @@ -158,14 +199,16 @@ def _install_umu( tar.extractall(path=tmpcache) # noqa: S202 # Move the files to the correct location - source_dir: Path = Path(tmpcache, f"SteamLinuxRuntime_{codename}") + source_dir: Path = tmp.joinpath( + f"SteamLinuxRuntime_{runtime_platform[1]}" + ) log.debug("Source: %s", source_dir) - log.debug("Destination: %s", UMU_LOCAL) + log.debug("Destination: %s", local) # Move each file to the dest dir, overwriting if exists futures.extend( [ - thread_pool.submit(_move, file, source_dir, UMU_LOCAL) + thread_pool.submit(_move, file, source_dir, local) for file in source_dir.glob("*") ] ) @@ -178,19 +221,24 @@ def _install_umu( # Rename _v2-entry-point log.debug("Renaming: _v2-entry-point -> umu") - UMU_LOCAL.joinpath("_v2-entry-point").rename(UMU_LOCAL.joinpath("umu")) + local.joinpath("_v2-entry-point").rename(local.joinpath("umu")) # Validate the runtime after moving the files - check_runtime(UMU_LOCAL, json) + check_runtime(local, runtime_platform[1]) def setup_umu( - root: Traversable, local: Path, thread_pool: ThreadPoolExecutor + root: Traversable, + local: Path, + runtime_platform: tuple[str, str, str], + thread_pool: ThreadPoolExecutor, ) -> None: """Install or update the runtime for the current user.""" log.debug("Root: %s", root) log.debug("Local: %s", local) - json: dict[str, Any] = _get_json(root, CONFIG) + log.debug("Steam Linux Runtime (latest): %s", runtime_platform[0]) + log.debug("Codename: %s", runtime_platform[1]) + log.debug("App ID: %s", runtime_platform[2]) host: str = "repo.steampowered.com" # New install or umu dir is empty @@ -199,10 +247,10 @@ def setup_umu( log.console( "Setting up Unified Launcher for Windows Games on Linux..." ) - local.mkdir(parents=True, exist_ok=True) with https_connection(host) as client_session: _restore_umu( - json, + local / runtime_platform[0], + runtime_platform, thread_pool, lambda: local.joinpath("umu").is_file(), client_session, @@ -216,12 +264,17 @@ def setup_umu( find_obsolete() with https_connection(host) as client_session: - _update_umu(local, json, thread_pool, client_session) + _update_umu( + local / runtime_platform[0], + runtime_platform, + thread_pool, + client_session, + ) def _update_umu( local: Path, - json: dict[str, Any], + runtime_platform: tuple[str, str, str], thread_pool: ThreadPoolExecutor, client_session: HTTPSConnection, ) -> None: @@ -232,30 +285,66 @@ def _update_umu( """ runtime: Path resp: HTTPResponse - codename: str = json["umu"]["versions"]["runtime_platform"] - endpoint: str = ( - f"/steamrt-images-{codename}" - "/snapshots/latest-container-runtime-public-beta" - ) + endpoint: str token: str = f"?version={token_urlsafe(16)}" + is_obsolete: bool = _is_obsolete_umu(runtime_platform) + log.debug("Existing install detected") log.debug("Sending request to '%s'...", client_session.host) + # When using an existing obsolete proton build, skip its updates but allow + # restoring it + if is_obsolete: + toolmanifest: Path = Path(os.environ["PROTONPATH"], "toolmanifest.vdf") + compat_tool: str = get_vdf_value( + toolmanifest, + "require_tool_appid", + ) + + # Change runtime paths and runtime base platform + for pv_runtime in __pressure_vessel_runtimes__: + if compat_tool in pv_runtime: + log.debug( + "Changing SLR base platform: %s -> %s", + runtime_platform, + pv_runtime, + ) + log.debug( + "Changing base directory: '%s' -> '%s'", + local, + local.parent / pv_runtime[0], + ) + runtime_platform = pv_runtime + local = local.parent / pv_runtime[0] + break + + endpoint = ( + f"/steamrt-images-{runtime_platform[1]}" + "/snapshots/latest-container-runtime-public-beta" + ) + # Find the runtime directory (e.g., sniper_platform_0.20240530.90143) # Assume the directory begins with the alias try: runtime = max( - file for file in local.glob(f"{codename}*") if file.is_dir() + file + for file in local.glob(f"{runtime_platform[1]}*") + if file.is_dir() ) except ValueError: log.debug("*_platform_* directory missing in '%s'", local) log.warning("Runtime Platform not found") log.console("Restoring Runtime Platform...") _restore_umu( - json, + local, + runtime_platform, thread_pool, lambda: len( - [file for file in local.glob(f"{codename}*") if file.is_dir()] + [ + file + for file in local.glob(f"{runtime_platform[1]}*") + if file.is_dir() + ] ) > 0, client_session, @@ -263,14 +352,14 @@ def _update_umu( return log.debug("Runtime: %s", runtime.name) - log.debug("Codename: %s", codename) if not local.joinpath("pressure-vessel").is_dir(): log.debug("pressure-vessel directory missing in '%s'", local) log.warning("Runtime Platform not found") log.console("Restoring Runtime Platform...") _restore_umu( - json, + local, + runtime_platform, thread_pool, lambda: local.joinpath("pressure-vessel").is_dir(), client_session, @@ -283,7 +372,7 @@ def _update_umu( if not local.joinpath("VERSIONS.txt").is_file(): url: str release: Path = runtime.joinpath("files", "lib", "os-release") - versions: str = f"SteamLinuxRuntime_{codename}.VERSIONS.txt" + versions: str = f"SteamLinuxRuntime_{runtime_platform[1]}.VERSIONS.txt" log.debug("VERSIONS.txt file missing in '%s'", local) @@ -294,7 +383,8 @@ def _update_umu( log.warning("Runtime Platform corrupt") log.console("Restoring Runtime Platform...") _restore_umu( - json, + local, + runtime_platform, thread_pool, lambda: local.joinpath("VERSIONS.txt").is_file(), client_session, @@ -310,7 +400,8 @@ def _update_umu( line.removeprefix("BUILD_ID=").rstrip().strip('"') ) url = ( - f"/steamrt-images-{codename}" f"/snapshots/{build_id}" + f"/steamrt-images-{runtime_platform[1]}" + f"/snapshots/{build_id}" ) break @@ -338,6 +429,14 @@ def _update_umu( resp.read().decode() ) + # Skip SLR updates when not using the latest + if is_obsolete: + log.warning( + "%s is obsolete, skipping steamrt update", + Path(os.environ["PROTONPATH"]).name, + ) + return + # Update the runtime if necessary by comparing VERSIONS.txt to the remote # repo.steampowered currently sits behind a Cloudflare proxy, which may # respond with cf-cache-status: HIT in the header for subsequent requests @@ -345,7 +444,10 @@ def _update_umu( # has control over the CDN's cache control behavior, so we must not assume # all of the cache will be purged after new files are uploaded. Therefore, # always avoid the cache by appending a unique query to the URI - url: str = f"{endpoint}/SteamLinuxRuntime_{codename}.VERSIONS.txt{token}" + url: str = ( + f"{endpoint}/SteamLinuxRuntime_{runtime_platform[1]}.VERSIONS.txt" + f"{token}" + ) client_session.request("GET", url) # Attempt to compare the digests @@ -382,7 +484,9 @@ def _update_umu( ): log.debug("Released file lock '%s'", lock.lock_file) return - _install_umu(json, thread_pool, client_session) + _install_umu( + local, runtime_platform, thread_pool, client_session + ) log.debug("Removing: %s", runtime) rmtree(str(runtime)) log.debug("Released file lock '%s'", lock.lock_file) @@ -390,50 +494,6 @@ def _update_umu( log.console("steamrt is up to date") -def _get_json(path: Traversable, config: str) -> dict[str, Any]: - """Validate the state of the configuration file umu_version.json in a path. - - The configuration file will be used to update the runtime and it reflects - the tools currently used by launcher. The key/value pairs umu and versions - must exist. - """ - json: dict[str, Any] - # Steam Runtime platform values - # See https://gitlab.steamos.cloud/steamrt/steamrt/-/wikis/home - steamrts: set[str] = { - "soldier", - "sniper", - "medic", - "steamrt5", - } - - # umu_version.json in the system path should always exist - if not path.joinpath(config).is_file(): - err: str = ( - f"File not found: {config}\n" - "Please reinstall the package to recover configuration file" - ) - raise FileNotFoundError(err) - - with path.joinpath(config).open(mode="r", encoding="utf-8") as file: - json = load(file) - - # Raise an error if "umu" and "versions" doesn't exist - if not json or "umu" not in json or "versions" not in json["umu"]: - err: str = ( - f"Failed to load {config} or 'umu' or 'versions' not in: {config}" - ) - raise ValueError(err) - - # The launcher will use the value runtime_platform to glob files. Attempt - # to guard against directory removal attacks for non-system wide installs - if json["umu"]["versions"]["runtime_platform"] not in steamrts: - err: str = "Value for 'runtime_platform' is not a steamrt" - raise ValueError(err) - - return json - - def _move(file: Path, src: Path, dst: Path) -> None: """Move a file or directory to a destination. @@ -453,7 +513,7 @@ def _move(file: Path, src: Path, dst: Path) -> None: move(src_file, dest_file) -def check_runtime(src: Path, json: dict[str, Any]) -> int: +def check_runtime(src: Path, codename: str) -> int: """Validate the file hierarchy of the runtime platform. The mtree file included in the Steam runtime platform will be used to @@ -461,7 +521,6 @@ def check_runtime(src: Path, json: dict[str, Any]) -> int: home directory and used to run games. """ runtime: Path - codename: str = json["umu"]["versions"]["runtime_platform"] pv_verify: Path = src.joinpath("pressure-vessel", "bin", "pv-verify") ret: int = 1 @@ -501,16 +560,31 @@ def check_runtime(src: Path, json: dict[str, Any]) -> int: def _restore_umu( - json: dict[str, Any], + local: Path, + runtime_platform: tuple[str, str, str], thread_pool: ThreadPoolExecutor, callback_fn: Callable[[], bool], client_session: HTTPSConnection, ) -> None: - with FileLock(f"{UMU_LOCAL}/umu.lock") as lock: + with FileLock(f"{local.parent}/umu.lock") as lock: log.debug("Acquired file lock '%s'...", lock.lock_file) if callback_fn(): log.debug("Released file lock '%s'", lock.lock_file) log.console("steamrt was restored") return - _install_umu(json, thread_pool, client_session) + _install_umu(local, runtime_platform, thread_pool, client_session) log.debug("Released file lock '%s'", lock.lock_file) + + +def _is_obsolete_umu(runtime_platform: tuple[str, str, str]) -> bool: + return bool( + os.environ.get("PROTONPATH") + and os.environ.get("PROTONPATH") != "GE-Proton" + and get_vdf_value( + Path(os.environ["PROTONPATH"], "toolmanifest.vdf").resolve( + strict=True + ), + "require_tool_appid", + ) + not in runtime_platform + ) diff --git a/umu/umu_test.py b/umu/umu_test.py index b8ddc9657..9198cf547 100644 --- a/umu/umu_test.py +++ b/umu/umu_test.py @@ -1,9 +1,11 @@ import argparse import json import os +import random import re import sys import tarfile +import tempfile import unittest from argparse import Namespace from concurrent.futures import ThreadPoolExecutor @@ -69,6 +71,29 @@ def setUp(self): self.test_compat = Path("./tmp.ZssGZoiNod") # umu-proton dir self.test_proton_dir = Path("UMU-Proton-5HYdpddgvs") + # toolmanifest.vdf within Proton + self.test_vdf = ( + '"manifest"\n' + "{\n" + ' "version" "2"\n' + ' "commandline" "/proton %verb%"\n' + ' "require_tool_appid" "1628350"\n' + ' "use_sessions" "1"\n' + ' "compatmanager_layer_name" "proton"\n' + "}" + ) + # app_build_*.vdf within the runtime dir + self.test_app_build_vdf = ( + '"appbuild"\n' + "{\n" + ' "appid" "1628350"\n' + ' "buildoutput" "output"\n' + ' "depots"\n' + " {\n" + ' "1628351" "depot_build_1628351.vdf"\n' + " }\n" + "}" + ) # umu-proton release self.test_archive = Path(self.test_cache).joinpath( f"{self.test_proton_dir}.tar.gz" @@ -104,6 +129,7 @@ def setUp(self): self.test_proton_dir.mkdir(exist_ok=True) self.test_usr.mkdir(exist_ok=True) self.test_cache_home.mkdir(exist_ok=True) + self.test_local_share.joinpath("steampipe").mkdir(exist_ok=True) # Mock a valid configuration file at /usr/share/umu: # tmp.BXk2NnvW2m/umu_version.json @@ -132,6 +158,11 @@ def setUp(self): Path(self.test_user_share, "run-in-sniper").touch() Path(self.test_user_share, "umu").touch() + # Mock app_build_*.vdf in $HOME/.local/share/umu/steampipe + self.test_local_share.joinpath( + "steampipe", "app_build_1628350.vdf" + ).write_text(self.test_app_build_vdf, encoding="utf-8") + # Mock pressure vessel Path(self.test_user_share, "pressure-vessel", "bin").mkdir( parents=True @@ -141,6 +172,11 @@ def setUp(self): self.test_user_share, "pressure-vessel", "bin", "pv-verify" ).touch() + # Mock toolmanifest.vdf in the dir + self.test_proton_dir.joinpath("toolmanifest.vdf").write_text( + self.test_vdf, encoding="utf-8" + ) + # Mock the proton file in the dir self.test_proton_dir.joinpath("proton").touch(exist_ok=True) @@ -192,6 +228,79 @@ def tearDown(self): if self.test_cache_home.exists(): rmtree(self.test_cache_home.as_posix()) + def test_get_vdf_na(self): + """Test get_vdf_value when the toolmanifest.vdf isn't found. + + Expects an empty string to be returned + """ + key = "qux" + + with tempfile.TemporaryDirectory(dir=Path.cwd()) as tmp: + toolmanifest = Path(tmp, "toolmanifest.vdf") + toolmanifest.write_text(self.test_vdf, encoding="utf-8") + result = umu_util.get_vdf_value(toolmanifest, key) + self.assertFalse( + result, "Expected an empty string when file was not found" + ) + + def test_get_vdf_binary(self): + """Test get_vdf_value when reading binary data. + + Expects an empty string to be returned when reading binary data + """ + key = "baz" + + with tempfile.TemporaryDirectory(dir=Path.cwd()) as tmp: + toolmanifest = Path(tmp, "toolmanifest.vdf") + toolmanifest.write_bytes(random.randbytes(16)) # noqa: S311 + result = umu_util.get_vdf_value(toolmanifest, key) + self.assertFalse( + result, "Expected an empty string when file was not found" + ) + result = umu_util.get_vdf_value( + self.test_proton_dir.joinpath("toolmanifest.vdf"), key + ) + self.assertFalse( + result, "Expected an empty string when reading binary" + ) + + def test_get_vdf_value_foo(self): + """Test get_vdf_value when a key does not exist. + + Expects an empty string to be returned. + """ + key = "foo" + expected = "" + + with tempfile.TemporaryDirectory(dir=Path.cwd()) as tmp: + toolmanifest = Path(tmp, "toolmanifest.vdf") + toolmanifest.write_text(self.test_vdf, encoding="utf-8") + result = umu_util.get_vdf_value( + self.test_proton_dir.joinpath("toolmanifest.vdf"), key + ) + self.assertEqual( + result, expected, f"Expected '{expected}', received '{result}'" + ) + + def test_get_vdf_value(self): + """Test get_vdf_value. + + Expects a value to be returned when passed a valid key from reading a + non-binary VDF file + """ + key = "require_tool_appid" + expected = "1628350" + + with tempfile.TemporaryDirectory(dir=Path.cwd()) as tmp: + toolmanifest = Path(tmp, "toolmanifest.vdf") + toolmanifest.write_text(self.test_vdf, encoding="utf-8") + result = umu_util.get_vdf_value( + self.test_proton_dir.joinpath("toolmanifest.vdf"), key + ) + self.assertEqual( + result, expected, f"Expected '{expected}', received '{result}'" + ) + def test_rearrange_gamescope_baselayer_none(self): """Test rearrange_gamescope_baselayer_order when passed correct seq. @@ -366,23 +475,19 @@ def test_check_runtime(self): If the pv-verify binary does not exist, a warning should be logged and the function should return """ - json_root = umu_runtime._get_json( - self.test_user_share, "umu_version.json" - ) + codename = "sniper" self.test_user_share.joinpath( "pressure-vessel", "bin", "pv-verify" ).unlink() - result = umu_runtime.check_runtime(self.test_user_share, json_root) + result = umu_runtime.check_runtime(self.test_user_share, codename) self.assertEqual(result, 1, "Expected the exit code 1") def test_check_runtime_success(self): """Test check_runtime when runtime validation succeeds.""" - json_root = umu_runtime._get_json( - self.test_user_share, "umu_version.json" - ) + codename = "sniper" mock = CompletedProcess(["foo"], 0) with patch.object(umu_runtime, "run", return_value=mock): - result = umu_runtime.check_runtime(self.test_user_share, json_root) + result = umu_runtime.check_runtime(self.test_user_share, codename) self.assertEqual(result, 0, "Expected the exit code 0") def test_check_runtime_dir(self): @@ -390,9 +495,7 @@ def test_check_runtime_dir(self): runtime = Path( self.test_user_share, "sniper_platform_0.20240125.75305" ) - json_root = umu_runtime._get_json( - self.test_user_share, "umu_version.json" - ) + codename = "sniper" # Mock the removal of the runtime directory # In the real usage when updating the runtime, this should not happen @@ -403,7 +506,7 @@ def test_check_runtime_dir(self): mock = CompletedProcess(["foo"], 1) with patch.object(umu_runtime, "run", return_value=mock): - result = umu_runtime.check_runtime(self.test_user_share, json_root) + result = umu_runtime.check_runtime(self.test_user_share, codename) self.assertEqual(result, 1, "Expected the exit code 1") def test_move(self): @@ -518,119 +621,6 @@ def test_ge_proton_none(self): os.environ.get("PROTONPATH"), "Expected empty string" ) - def test_get_json_err(self): - """Test _get_json when specifying a corrupted umu_version.json file. - - A ValueError should be raised because we expect 'umu' and 'version' - keys to exist - """ - test_config = """ - { - "foo": { - "versions": { - "launcher": "0.1-RC3", - "runner": "0.1-RC3", - "runtime_platform": "sniper" - } - } - } - """ - test_config2 = """ - { - "umu": { - "foo": { - "launcher": "0.1-RC3", - "runner": "0.1-RC3", - "runtime_platform": "sniper" - } - } - } - """ - # Remove the valid config created at setup - Path(self.test_user_share, "umu_version.json").unlink(missing_ok=True) - - Path(self.test_user_share, "umu_version.json").touch() - with Path(self.test_user_share, "umu_version.json").open( - mode="w", encoding="utf-8" - ) as file: - file.write(test_config) - - # Test when "umu" doesn't exist - with self.assertRaisesRegex(ValueError, "load"): - umu_runtime._get_json(self.test_user_share, "umu_version.json") - - # Test when "versions" doesn't exist - Path(self.test_user_share, "umu_version.json").unlink(missing_ok=True) - - Path(self.test_user_share, "umu_version.json").touch() - with Path(self.test_user_share, "umu_version.json").open( - mode="w", encoding="utf-8" - ) as file: - file.write(test_config2) - - with self.assertRaisesRegex(ValueError, "load"): - umu_runtime._get_json(self.test_user_share, "umu_version.json") - - def test_get_json_foo(self): - """Test _get_json when not specifying umu_version.json as 2nd arg. - - A FileNotFoundError should be raised - """ - with self.assertRaisesRegex(FileNotFoundError, "configuration"): - umu_runtime._get_json(self.test_user_share, "foo") - - def test_get_json_steamrt(self): - """Test _get_json when passed a non-steamrt value. - - This attempts to mitigate against directory removal attacks for user - installations in the home directory, since the launcher will remove the - old runtime on update. Currently expects runtime_platform value to be - 'soldier', 'sniper', 'medic' and 'steamrt5' - """ - config = { - "umu": { - "versions": { - "launcher": "0.1-RC3", - "runner": "0.1-RC3", - "runtime_platform": "foo", - } - } - } - test_config = json.dumps(config, indent=4) - - self.test_user_share.joinpath("umu_version.json").unlink( - missing_ok=True - ) - self.test_user_share.joinpath("umu_version.json").write_text( - test_config, encoding="utf-8" - ) - - with self.assertRaises(ValueError): - umu_runtime._get_json(self.test_user_share, "umu_version.json") - - def test_get_json(self): - """Test _get_json. - - This function is used to verify the existence and integrity of - umu_version.json file during the setup process - - umu_version.json is used to synchronize the state of 2 directories: - /usr/share/umu and ~/.local/share/umu - - An error should not be raised when passed a JSON we expect - """ - result = None - - self.assertTrue( - self.test_user_share.joinpath("umu_version.json").exists(), - "Expected umu_version.json to exist", - ) - - result = umu_runtime._get_json( - self.test_user_share, "umu_version.json" - ) - self.assertIsInstance(result, dict, "Expected a dict") - def test_latest_interrupt(self): """Test _get_latest when the user interrupts the download/extraction. @@ -1352,6 +1342,7 @@ def test_build_command(self): """ result_args = None test_command = [] + test_runtime_platform = ("steamrt3", "sniper", "1628350") # Mock the proton file Path(self.test_file, "proton").touch() @@ -1380,10 +1371,13 @@ def test_build_command(self): # Mock setting up the runtime with ( - patch.object(umu_runtime, "_install_umu", return_value=None), + patch.object(umu_runtime, "setup_umu", return_value=None), ): umu_runtime.setup_umu( - self.test_user_share, self.test_local_share, None + self.test_user_share, + self.test_local_share, + test_runtime_platform, + None, ) copytree( Path(self.test_user_share, "sniper_platform_0.20240125.75305"), diff --git a/umu/umu_test_plugins.py b/umu/umu_test_plugins.py index 723c22d79..2614cdf39 100644 --- a/umu/umu_test_plugins.py +++ b/umu/umu_test_plugins.py @@ -193,6 +193,7 @@ def test_build_command_entry(self): toml_path = self.test_file + "/" + test_toml result = None test_command = [] + test_runtime_platform = ("steamrt3", "sniper", "1628350") Path(toml_path).touch() # Mock the proton file @@ -220,10 +221,13 @@ def test_build_command_entry(self): # Mock setting up the runtime # Don't copy _v2-entry-point with ( - patch.object(umu_runtime, "_install_umu", return_value=None), + patch.object(umu_runtime, "setup_umu", return_value=None), ): umu_runtime.setup_umu( - self.test_user_share, self.test_local_share, None + self.test_user_share, + self.test_local_share, + test_runtime_platform, + None, ) copytree( Path(self.test_user_share, "sniper_platform_0.20240125.75305"), @@ -272,6 +276,7 @@ def test_build_command_proton(self): toml_path = self.test_file + "/" + test_toml result = None test_command = [] + test_runtime_platform = ("steamrt3", "sniper", "1628350") Path(toml_path).touch() with Path(toml_path).open(mode="w", encoding="utf-8") as file: @@ -298,7 +303,10 @@ def test_build_command_proton(self): patch.object(umu_runtime, "_install_umu", return_value=None), ): umu_runtime.setup_umu( - self.test_user_share, self.test_local_share, None + self.test_user_share, + self.test_local_share, + test_runtime_platform, + None, ) copytree( Path(self.test_user_share, "sniper_platform_0.20240125.75305"), @@ -348,6 +356,7 @@ def test_build_command_toml(self): toml_path = self.test_file + "/" + test_toml result = None test_command = [] + test_runtime_platform = ("steamrt3", "sniper", "1628350") Path(self.test_file + "/proton").touch() Path(toml_path).touch() @@ -373,10 +382,13 @@ def test_build_command_toml(self): # Mock setting up the runtime with ( - patch.object(umu_runtime, "_install_umu", return_value=None), + patch.object(umu_runtime, "setup_umu", return_value=None), ): umu_runtime.setup_umu( - self.test_user_share, self.test_local_share, None + self.test_user_share, + self.test_local_share, + test_runtime_platform, + None, ) copytree( Path(self.test_user_share, "sniper_platform_0.20240125.75305"), diff --git a/umu/umu_util.py b/umu/umu_util.py index f3bc1c579..473256192 100644 --- a/umu/umu_util.py +++ b/umu/umu_util.py @@ -1,4 +1,5 @@ import os +from collections.abc import Generator from contextlib import contextmanager from ctypes.util import find_library from functools import lru_cache @@ -9,6 +10,7 @@ from shutil import which from ssl import SSLContext, create_default_context from subprocess import PIPE, STDOUT, Popen, TimeoutExpired +from typing import Any from Xlib import display @@ -173,6 +175,57 @@ def is_winetricks_verb( return True +@lru_cache +def _parse_vdf(path: Path) -> dict[str, str]: + vdf: dict[str, str] = {} + + try: + # Assumes the input is a VDF file and conforms to the + # KeyValues Text File Format. Also assumes that the file + # is not in a compact form e.g., {foo:"bar",baz:"qux"} + # See https://developer.valvesoftware.com/wiki/VDF + # See https://developer.valvesoftware.com/wiki/KeyValues + with path.open(mode="r", encoding="utf-8") as file: + for line in (line for line in file if line.isascii()): + tokens: list[str] = line.split() + if len(tokens) != 2: + continue + # In each token, a double quote is used as a control + # character and a double quote must not be used within its + # name or value + # See https://developer.valvesoftware.com/wiki/KeyValues#About_KeyValues_Text_File_Format + vdf_key, vdf_val = (token.strip('"') for token in tokens) + vdf[vdf_key] = vdf_val + except (UnicodeDecodeError, FileNotFoundError) as e: + log.exception(e) + + return vdf + + +def get_vdf_value(path: Path, key: str) -> str: + """Get a value from a specific key in a VDF file.""" + if isinstance(path, Path) and not path.name.endswith(".vdf"): + return "" + + log.debug("Parsing '%s'", path) + for vdf_key, vdf_val in _parse_vdf(path).items(): + log.debug("%s=%s", vdf_key, vdf_val) + if key == vdf_key: + return vdf_val + + return "" + + +def get_vdfs(path: Path, pattern: str) -> Generator[Path, Any, None]: + """Get all *.vdf files within a path.""" + if pattern and pattern.endswith(".vdf"): + for path in path.glob(pattern): + yield path + + for path in path.glob("**/*/*.vdf"): + yield path + + def find_obsolete() -> None: """Find obsoleted launcher files and log them.""" home: Path = Path.home() diff --git a/umu/umu_version.json.in b/umu/umu_version.json.in deleted file mode 100644 index a0bb6ab13..000000000 --- a/umu/umu_version.json.in +++ /dev/null @@ -1,9 +0,0 @@ -{ - "umu": { - "versions": { - "launcher": "##UMU_VERSION##", - "runner": "##UMU_VERSION##", - "runtime_platform": "sniper" - } - } -}