diff --git a/checks.py b/checks.py index 064e0651..9d57830e 100644 --- a/checks.py +++ b/checks.py @@ -17,7 +17,8 @@ def esync_file_limits() -> bool: https://github.com/zfigura/wine/blob/esync/README.esync """ - with open('/proc/sys/fs/file-max', encoding='ascii') as fsmax: + # open is OK here + with open('/proc/sys/fs/file-max', encoding='ascii') as fsmax: # noqa: PTH123 max_files = fsmax.readline() if int(max_files) < 8192: log.warn(warning) diff --git a/config.py b/config.py index 0743be36..d25186ec 100644 --- a/config.py +++ b/config.py @@ -1,7 +1,7 @@ """Load configuration settings for protonfixes""" -import os from configparser import ConfigParser +from pathlib import Path try: from .logger import log @@ -25,7 +25,7 @@ CONF.read_string(DEFAULT_CONF) try: - CONF.read(os.path.expanduser(CONF_FILE)) + CONF.read(Path(CONF_FILE).expanduser()) except Exception: log.debug('Unable to read config file ' + CONF_FILE) @@ -38,9 +38,12 @@ def opt_bool(opt: str) -> bool: locals().update({x: opt_bool(y) for x, y in CONF['main'].items() if 'enable' in x}) -locals().update({x: os.path.expanduser(y) for x, y in CONF['path'].items()}) +locals().update({x: Path(y).expanduser() for x, y in CONF['path'].items()}) try: - [os.makedirs(os.path.expanduser(d)) for n, d in CONF['path'].items()] + [ + Path(d).expanduser().mkdir(parents=True, exist_ok=True) + for n, d in CONF['path'].items() + ] except OSError: pass diff --git a/download.py b/download.py index 64a853d1..455c772b 100644 --- a/download.py +++ b/download.py @@ -1,9 +1,9 @@ """Module with helper functions to download from file-hosting providers""" -import os import hashlib import urllib.request import http.cookiejar +from pathlib import Path GDRIVE_URL = 'https://drive.google.com/uc?id={}&export=download' @@ -36,16 +36,17 @@ def gdrive_download(gdrive_id: str, path: str) -> None: req = urllib.request.Request(f'{url}&confirm={confirm}') with urllib.request.urlopen(req, timeout=10) as resp: filename = get_filename(resp.getheaders()) - with open(os.path.join(path, filename), 'wb') as save_file: + with Path(path, filename).open('wb') as save_file: save_file.write(resp.read()) def sha1sum(filename: str) -> str: """Computes the sha1sum of the specified file""" - if not os.path.isfile(filename): + path = Path(filename) + if not path.is_file(): return '' hasher = hashlib.sha1() - with open(filename, 'rb') as hash_file: + with path.open('rb') as hash_file: buf = hash_file.read(HASH_BLOCK_SIZE) while len(buf) > 0: hasher.update(buf) diff --git a/engine.py b/engine.py index e19a777d..bda58ec4 100644 --- a/engine.py +++ b/engine.py @@ -3,6 +3,7 @@ import os import sys from .logger import log +from pathlib import Path class Engine: @@ -48,7 +49,7 @@ def _is_unity(self) -> bool: # Check .../Gamename_Data/Mono/etc/ dir for data_dir in data_list: - if os.path.exists(os.path.join(os.environ['PWD'], data_dir, 'Mono/etc')): + if Path(os.environ['PWD'], data_dir, 'Mono/etc').exists(): return True return False @@ -60,9 +61,7 @@ def _is_dunia2(self) -> bool: # Check .../data_win*/worlds/multicommon dir for data_dir in data_list: - if os.path.exists( - os.path.join(os.environ['PWD'], data_dir, 'worlds/multicommon') - ): + if Path(os.environ['PWD'], data_dir, 'worlds/multicommon').exists(): return True return False @@ -75,7 +74,7 @@ def _is_rage(self) -> bool: # for data_dir in dir_list: # if os.path.exists(os.path.join(os.environ['PWD'], data_dir, 'pc/data/cdimages')): # return True - if os.path.exists(os.path.join(os.environ['PWD'], 'pc/data/cdimages')): + if Path(os.environ['PWD'], 'pc/data/cdimages'): return True return False diff --git a/fix.py b/fix.py index f00835dc..64652698 100644 --- a/fix.py +++ b/fix.py @@ -42,42 +42,45 @@ def get_game_id() -> str: def get_game_name() -> str: """Trys to return the game name from environment variables""" pfx = os.environ.get('WINEPREFIX') or protonmain.g_session.env.get('WINEPREFIX') - script_dir = os.path.dirname(os.path.abspath(__file__)) + script_dir = os.path.dirname(os.path.abspath(__file__)) # noqa: PTH100, PTH120 if os.environ.get('UMU_ID'): - - if os.path.isfile(f'{pfx}/game_title'): - with open(f'{pfx}/game_title', encoding='utf-8') as file: + if os.path.isfile(f'{pfx}/game_title'): # noqa: PTH113 + with open(f'{pfx}/game_title', encoding='utf-8') as file: # noqa: PTH123 return file.readline() umu_id = os.environ['UMU_ID'] store = os.getenv('STORE', 'none') - csv_file_path = os.path.join(script_dir, 'umu-database.csv') + csv_file_path = os.path.join(script_dir, 'umu-database.csv') # noqa: PTH118 try: - with open(csv_file_path, newline='', encoding='utf-8') as csvfile: + with open(csv_file_path, newline='', encoding='utf-8') as csvfile: # noqa: PTH123 csvreader = csv.reader(csvfile) for row in csvreader: # Check if the row has enough columns and matches both UMU_ID and STORE if len(row) > 3 and row[3] == umu_id and row[1] == store: title = row[0] # Title is the first entry - with open(os.path.join(script_dir, 'game_title'), 'w', encoding='utf-8') as file: + with open( # noqa: PTH123 + os.path.join(script_dir, 'game_title'), # noqa: PTH118 + 'w', + encoding='utf-8', + ) as file: file.write(title) return title except FileNotFoundError: - log.warn(f"CSV file not found: {csv_file_path}") + log.warn(f'CSV file not found: {csv_file_path}') except Exception as ex: - log.debug(f"Error reading CSV file: {ex}") + log.debug(f'Error reading CSV file: {ex}') - log.warn("Game title not found in CSV") + log.warn('Game title not found in CSV') return 'UNKNOWN' try: log.debug('UMU_ID is not in environment') game_library = re.findall(r'.*/steamapps', os.environ['PWD'], re.IGNORECASE)[-1] - game_manifest = os.path.join(game_library, f'appmanifest_{get_game_id()}.acf') + game_manifest = os.path.join(game_library, f'appmanifest_{get_game_id()}.acf') # noqa: PTH118 - with open(game_manifest, encoding='utf-8') as appmanifest: + with open(game_manifest, encoding='utf-8') as appmanifest: # noqa: PTH123 for xline in appmanifest.readlines(): if 'name' in xline.strip(): name = re.findall(r'"[^"]+"', xline, re.UNICODE)[-1] @@ -129,16 +132,16 @@ def get_module_name(game_id: str, default: bool = False, local: bool = False) -> def _run_fix_local(game_id: str, default: bool = False) -> bool: """Check if a local gamefix is available first and run it""" - localpath = os.path.expanduser('~/.config/protonfixes/localfixes') + localpath = os.path.expanduser('~/.config/protonfixes/localfixes') # noqa: PTH111 module_name = game_id if not default else 'default' # Check if local gamefix exists - if not os.path.isfile(os.path.join(localpath, module_name + '.py')): + if not os.path.isfile(os.path.join(localpath, module_name + '.py')): # noqa: PTH113, PTH118 return False # Ensure local gamefixes are importable as modules via PATH - with open(os.path.join(localpath, '__init__.py'), 'a', encoding='utf-8'): - sys.path.append(os.path.expanduser('~/.config/protonfixes')) + with open(os.path.join(localpath, '__init__.py'), 'a', encoding='utf-8'): # noqa: PTH118, PTH123 + sys.path.append(os.path.expanduser('~/.config/protonfixes')) # noqa: PTH111 # Run fix return _run_fix(game_id, default, True) diff --git a/logger.py b/logger.py index 5905d6c6..1cc10bfe 100644 --- a/logger.py +++ b/logger.py @@ -2,6 +2,7 @@ import os import sys +from pathlib import Path class Log: @@ -30,8 +31,9 @@ def log(self, msg: str = '', level: str = 'INFO') -> None: fulltext = color + pfx + str(msg) + reset + os.linesep sys.stderr.write(fulltext) sys.stderr.flush() - with open('/tmp/test', 'a', 1, encoding='utf-8') as testfile: - testfile.write(logtext) + + with Path('/tmp/test').open(mode='a', encoding='utf-8', buffering=1) as file: + file.write(logtext) def info(self, msg: str) -> None: """Wrapper for printing info messages""" diff --git a/ruff.toml b/ruff.toml index 100a5f39..1ed459f9 100644 --- a/ruff.toml +++ b/ruff.toml @@ -54,7 +54,9 @@ select = [ "FLY", # Simplify and update syntax to our target Python version "UP", - "D" + "D", + # Enforce the use of pathlib when working with filesystem paths + "PTH", ] ignore = [ # Requiring a type for self is deprecated @@ -84,7 +86,7 @@ dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" [lint.per-file-ignores] # Relax docstring-related lint rules for gamefixes -"gamefixes-*" = ["D103", "D205"] +"gamefixes-*" = ["D103", "D205", "PTH"] [format] # Like Black, use double quotes for strings. diff --git a/steamhelper.py b/steamhelper.py index e742059f..80cb4765 100644 --- a/steamhelper.py +++ b/steamhelper.py @@ -1,10 +1,10 @@ """The Steamhelper allows the installation of Steam apps""" -import os import re import shutil import subprocess import time +from pathlib import Path libpaths = [] @@ -50,8 +50,10 @@ def _is_app_installed(appid: str) -> bool: is_installed = False for librarypath in libraries_path: - appmanifest_path = _get_manifest_path(appid, librarypath) - if os.path.exists(appmanifest_path): + appmanifest_path = Path( + librarypath, 'steamapps', f'appmanifest_{str(appid)}.acf' + ) + if appmanifest_path.exists(): # noqa: PTH110 state = _find_regex_groups(appmanifest_path, REGEX_STATE, 'state') if len(state) > 0 and int(state[0]) == 4: is_installed = True @@ -63,24 +65,17 @@ def _get_steam_libraries_path() -> list: """Get Steam Libraries Path""" if len(libpaths) == 0: for steampath in STEAM_DIRS: - libfile = os.path.join( - os.path.expanduser(steampath), 'steamapps', 'libraryfolders.vdf' - ) - if os.path.exists(libfile): + libfile = Path(steampath, 'steamapps', 'libraryfolders.vdf').expanduser() + if libfile.is_file(): libpaths.append(_find_regex_groups(libfile, REGEX_LIB, 'path')) break return libpaths -def _get_manifest_path(appid: str, librarypath: str) -> str: - """Get appmanifest path""" - return os.path.join(librarypath, 'steamapps', f'appmanifest_{str(appid)}.acf') - - -def _find_regex_groups(path: str, regex: re.Pattern, groupname: str) -> list: +def _find_regex_groups(path: Path, regex: re.Pattern, groupname: str) -> list: """Given a file and a regex with a named group groupname, return an array of all the matches""" matches = [] - with open(path, encoding='ascii') as re_file: + with path.open(encoding='ascii') as re_file: for line in re_file: search = regex.search(line) if search: diff --git a/util.py b/util.py index c7a5fac0..672ccd45 100644 --- a/util.py +++ b/util.py @@ -15,6 +15,7 @@ from socket import socket, AF_INET, SOCK_DGRAM from typing import Literal, Any, Callable, Union from collections.abc import Mapping, Generator +from pathlib import Path try: from .logger import log @@ -32,22 +33,21 @@ def which(appname: str) -> Union[str, None]: """Returns the full path of an executable in $PATH""" for path in os.environ['PATH'].split(os.pathsep): - fullpath = os.path.join(path, appname) - if os.path.exists(fullpath) and os.access(fullpath, os.X_OK): - return fullpath + fullpath = Path(path, appname) + if fullpath.exists() and os.access(fullpath, os.X_OK): + return str(fullpath) log.warn(str(appname) + 'not found in $PATH') return None def protondir() -> str: """Returns the path to proton""" - proton_dir = os.path.dirname(sys.argv[0]) - return proton_dir + return str(Path(sys.argv[0]).parent) def protonprefix() -> str: """Returns the wineprefix used by proton""" - return os.path.join(os.environ['STEAM_COMPAT_DATA_PATH'], 'pfx/') + return f"{os.environ['STEAM_COMPAT_DATA_PATH']}/pfx/" def protonnameversion() -> Union[str, None]: @@ -61,15 +61,15 @@ def protonnameversion() -> Union[str, None]: def protontimeversion() -> int: """Returns the version timestamp of proton from the `version` file""" - fullpath = os.path.join(protondir(), 'version') + fullpath = Path(protondir(), 'version') try: - with open(fullpath, encoding='ascii') as version: + with fullpath.open(mode='r', encoding='ascii') as version: for timestamp in version.readlines(): return int(timestamp.strip()) except OSError: - log.warn('Proton version file not found in: ' + fullpath) + log.warn(f'Proton version file not found in: {fullpath}') return 0 - log.warn('Proton version not parsed from file: ' + fullpath) + log.warn(f'Proton version not parsed from file: {fullpath}') return 0 @@ -101,11 +101,11 @@ def once( def wrapper(*args, **kwargs) -> None: # noqa: ANN002, ANN003 func_id = f'{func.__module__}.{func.__name__}' prefix = protonprefix() - directory = os.path.join(prefix, 'drive_c/protonfixes/run/') - file = os.path.join(directory, func_id) - if not os.path.exists(directory): - os.makedirs(directory) - if os.path.exists(file): + directory = Path(prefix, 'drive_c/protonfixes/run/') + file = directory / func_id + if not directory.exists(): + directory.mkdir(parents=True) + if file.exists(): return exception = None @@ -116,7 +116,7 @@ def wrapper(*args, **kwargs) -> None: # noqa: ANN002, ANN003 raise exc exception = exc - with open(file, 'a', encoding='ascii') as tmp: + with file.open(mode='a', encoding='ascii') as tmp: tmp.close() if exception: @@ -135,7 +135,7 @@ def _killhanging() -> None: badexes = ['mscorsvw.exe'] for pid in pids: try: - with open(os.path.join('/proc', pid, 'cmdline'), 'rb') as proc_cmd: + with Path('/proc', pid, 'cmdline').open(mode='rb') as proc_cmd: cmdline = proc_cmd.read() for exe in badexes: if exe in cmdline.decode(): @@ -146,9 +146,9 @@ def _killhanging() -> None: def _forceinstalled(verb: str) -> None: """Records verb into the winetricks.log.forced file""" - forced_log = os.path.join(protonprefix(), 'winetricks.log.forced') - with open(forced_log, 'a', encoding='ascii') as forcedlog: - forcedlog.write(verb + '\n') + forced_log = Path(protonprefix(), 'winetricks.log.forced') + with forced_log.open(mode='a', encoding='ascii') as forcedlog: + forcedlog.write(f'{verb}\n') def _checkinstalled(verb: str, logfile: str = 'winetricks.log') -> bool: @@ -156,7 +156,7 @@ def _checkinstalled(verb: str, logfile: str = 'winetricks.log') -> bool: if not isinstance(verb, str): return False - winetricks_log = os.path.join(protonprefix(), logfile) + winetricks_log = Path(protonprefix(), logfile) # Check for 'verb=param' verb types if len(verb.split('=')) > 1: @@ -164,7 +164,7 @@ def _checkinstalled(verb: str, logfile: str = 'winetricks.log') -> bool: wt_verb_param = verb.split('=')[1] wt_is_set = False try: - with open(winetricks_log, encoding='ascii') as tricklog: + with winetricks_log.open(mode='r', encoding='ascii') as tricklog: for xline in tricklog.readlines(): if re.findall(r'^' + wt_verb, xline.strip()): wt_is_set = bool(xline.strip() == wt_verb + wt_verb_param) @@ -173,7 +173,7 @@ def _checkinstalled(verb: str, logfile: str = 'winetricks.log') -> bool: return False # Check for regular verbs try: - with open(winetricks_log, encoding='ascii') as tricklog: + with winetricks_log.open(mode='r', encoding='ascii') as tricklog: if verb in reversed([x.strip() for x in tricklog.readlines()]): return True except OSError: @@ -197,20 +197,24 @@ def is_custom_verb(verb: str) -> Union[bool, str]: if verb == 'gui': return False - verb_name = verb + '.verb' + verb_name = f'{verb}.verb' verb_dir = 'verbs' + verbpath = Path('~/.config/protonfixes/localfixes/', verb_dir).expanduser() + # check local custom verbs - verbpath = os.path.expanduser('~/.config/protonfixes/localfixes/' + verb_dir) - if os.path.isfile(os.path.join(verbpath, verb_name)): - log.debug('Using local custom winetricks verb from: ' + verbpath) - return os.path.join(verbpath, verb_name) + verbfile = verbpath / verb_name + if verbfile.is_file(): + log.debug(f'Using local custom winetricks verb from: {verbpath}') + return str(verbfile) + + verbpath = Path(__file__).parent / verb_dir # check custom verbs - verbpath = os.path.join(os.path.dirname(__file__), verb_dir) - if os.path.isfile(os.path.join(verbpath, verb_name)): - log.debug('Using custom winetricks verb from: ' + verbpath) - return os.path.join(verbpath, verb_name) + verbfile = verbpath / verb_name + if verbfile.is_file(): + log.debug(f'Using custom winetricks verb from: {verbpath}') + return str(verbfile) return False @@ -245,7 +249,8 @@ def protontricks(verb: str) -> bool: env['WINETRICKS_LATEST_VERSION_CHECK'] = 'disabled' env['LD_PRELOAD'] = '' - winetricks_bin = os.path.abspath(__file__).replace('util.py', 'winetricks') + # Using os.path.abspath suffices + winetricks_bin = os.path.abspath(__file__).replace('util.py', 'winetricks') # noqa: PTH100 winetricks_cmd = [winetricks_bin, '--unattended'] + verb.split(' ') if verb == 'gui': winetricks_cmd = [winetricks_bin, '--unattended'] @@ -428,8 +433,8 @@ def patch_libcuda() -> bool: Returns true if the library was patched correctly. Otherwise returns false """ - cache_dir = os.path.expanduser('~/.cache/protonfixes') - os.makedirs(cache_dir, exist_ok=True) + cache_dir = Path('~/.cache/protonfixes').expanduser() + cache_dir.mkdir(exist_ok=True, parents=True) try: # Use shutil.which to find ldconfig binary @@ -461,9 +466,9 @@ def patch_libcuda() -> bool: # Parse the line to extract the path parts = line.strip().split(' => ') if len(parts) == 2: - path = parts[1].strip() - if os.path.exists(path): - libcuda_path = os.path.abspath(path) + path = Path(parts[1].strip()) + if path.exists(): + libcuda_path = path.resolve() break if not libcuda_path: @@ -472,10 +477,9 @@ def patch_libcuda() -> bool: log.info(f'Found 64-bit libcuda.so at: {libcuda_path}') - patched_library = os.path.join(cache_dir, 'libcuda.patched.so') + patched_library = cache_dir / 'libcuda.patched.so' try: - with open(libcuda_path, 'rb') as f: - binary_data = f.read() + binary_data = libcuda_path.read_bytes() except OSError as e: log.crit(f'Unable to read libcuda.so: {e}') return False @@ -496,18 +500,16 @@ def patch_libcuda() -> bool: patched_binary_data = bytes.fromhex(hex_data) try: - with open(patched_library, 'wb') as f: - f.write(patched_binary_data) - + patched_library.write_bytes(patched_binary_data) # Set permissions to rwxr-xr-x (755) - os.chmod(patched_library, 0o755) + patched_library.chmod(0o755) log.debug(f'Permissions set to rwxr-xr-x for {patched_library}') except OSError as e: log.crit(f'Unable to write patched libcuda.so to {patched_library}: {e}') return False log.info(f'Patched libcuda.so saved to: {patched_library}') - set_environment('LD_PRELOAD', patched_library) + set_environment('LD_PRELOAD', str(patched_library)) return True except Exception as e: @@ -556,13 +558,13 @@ def disable_uplay_overlay() -> bool: UPlay will overwrite settings.yml on launch, but keep this setting. """ - config_dir = os.path.join( + config_dir = Path( protonprefix(), 'drive_c/users/steamuser/Local Settings/Application Data/Ubisoft Game Launcher/', ) - config_file = os.path.join(config_dir, 'settings.yml') + config_file = config_dir / 'settings.yml' - os.makedirs(config_dir, exist_ok=True) + config_dir.mkdir(exist_ok=True) try: data = ( @@ -574,12 +576,12 @@ def disable_uplay_overlay() -> bool: 'user:\n' ' closebehavior: CloseBehavior_Close' ) - with open(config_file, 'a+', encoding='ascii') as file: + with config_file.open(mode='a+', encoding='ascii') as file: file.write(data) log.info('Disabled UPlay overlay') return True except OSError as err: - log.warn('Could not disable UPlay overlay: ' + err.strerror) + log.warn(f'Could not disable UPlay overlay: {err.strerror}') return False @@ -597,7 +599,8 @@ def create_dosbox_conf( return conf = configparser.ConfigParser() conf.read_dict(conf_dict) - with open(conf_file, 'w', encoding='ascii') as file: + # Using open than Path.open is OK in this context + with open(conf_file, 'w', encoding='ascii') as file: # noqa: PTH123 conf.write(file) @@ -606,40 +609,41 @@ def _get_case_insensitive_name(path: str) -> str: e.g /path/to/game/system/gothic.ini -> /path/to/game/System/GOTHIC.INI """ - if os.path.exists(path): + # TODO: Refactor this later to use pathlib + if os.path.exists(path): # noqa: PTH110 return path root = path # Find first existing directory in the tree - while not os.path.exists(root): + while not os.path.exists(root): # noqa: PTH110 root = os.path.split(root)[0] if root[len(root) - 1] not in ['/', '\\']: root = root + os.sep # Separate missing path from existing root - s_working_dir = path.replace(root, '').split(os.sep) + s_working_dir = path.replace(root, '').split(os.sep) # noqa: PTH206 paths_to_find = len(s_working_dir) # Keep track of paths we found so far paths_found = 0 # Walk through missing paths for directory in s_working_dir: - if not os.path.exists(root): + if not os.path.exists(root): # noqa: PTH110 break dir_list = os.listdir(root) found = False for existing_dir in dir_list: # Find matching filename on drive if existing_dir.lower() == directory.lower(): - root = os.path.join(root, existing_dir) + root = os.path.join(root, existing_dir) # noqa: PTH118 paths_found += 1 found = True # If path was not found append case that we were looking for if not found: - root = os.path.join(root, directory) + root = os.path.join(root, directory) # noqa: PTH118 paths_found += 1 # Append rest of the path if we were unable to find directory at any level if paths_to_find != paths_found: - root = os.path.join(root, os.sep.join(s_working_dir[paths_found:])) + root = os.path.join(root, os.sep.join(s_working_dir[paths_found:])) # noqa: PTH118 return root @@ -647,30 +651,29 @@ def _get_config_full_path(cfile: str, base_path: str) -> Union[str, None]: """Find game's config file""" # Start from 'user'/'game' directories or absolute path if base_path == 'user': - cfg_path = os.path.join( - protonprefix(), 'drive_c/users/steamuser/My Documents', cfile - ) + cfg_path = Path(protonprefix(), 'drive_c/users/steamuser/My Documents', cfile) else: if base_path == 'game': - cfg_path = os.path.join(get_game_install_path(), cfile) + cfg_path = Path(get_game_install_path(), cfile) else: - cfg_path = cfile - cfg_path = _get_case_insensitive_name(cfg_path) + cfg_path = Path(cfile) + cfg_path = Path(_get_case_insensitive_name(str(cfg_path))) - if os.path.exists(cfg_path) and os.access(cfg_path, os.F_OK): - log.debug('Found config file: ' + cfg_path) - return cfg_path + if cfg_path.exists() and os.access(cfg_path, os.F_OK): + log.debug(f'Found config file: {cfg_path}') + return str(cfg_path) - log.warn('Config file not found: ' + cfg_path) + log.warn(f'Config file not found: {cfg_path}') return None def create_backup_config(cfg_path: str) -> None: """Create backup config file""" # Backup - if not os.path.exists(cfg_path + '.protonfixes.bak'): + bak = Path(f'{cfg_path}.protonfixes.bak').expanduser() + if not bak.is_file(): log.info('Creating backup for config file') - shutil.copyfile(cfg_path, cfg_path + '.protonfixes.bak') + shutil.copyfile(cfg_path, f'{cfg_path}.protonfixes.bak') def set_ini_options( @@ -694,7 +697,8 @@ def set_ini_options( log.info(f'Addinging INI options into {cfile}:\n{str(ini_opts)}') conf.read_string(ini_opts) - with open(cfg_path, 'w', encoding=encoding) as configfile: + # Using open is OK here + with open(cfg_path, 'w', encoding=encoding) as configfile: # noqa: PTH123 conf.write(configfile) return True @@ -711,13 +715,14 @@ def set_xml_options( # set options - base_size = os.path.getsize(xml_path) - backup_size = os.path.getsize(xml_path + '.protonfixes.bak') + xml_path = Path(xml_path).expanduser() + base_size = xml_path.stat().st_size + backup_size = xml_path.joinpath('.protonfixes.bak').stat().st_size if base_size != backup_size: return False - with open(xml_path, encoding='utf-8') as file: + with xml_path.open(mode='r', encoding='utf-8') as file: contents = file.readlines() i = 0 for line in contents: @@ -726,7 +731,7 @@ def set_xml_options( log.info(f'Adding XML options into {cfile}, line {i}:\n{xml_line}') contents.insert(i, xml_line + '\n') - with open(xml_path, 'w', encoding='utf-8') as file: + with xml_path.open(mode='w', encoding='utf-8') as file: for eachitem in contents: file.write(eachitem) @@ -737,7 +742,12 @@ def set_xml_options( def get_resolution() -> tuple[int, int]: """Returns screen res width, height using xrandr""" # Execute xrandr command and capture its output - xrandr_bin = os.path.abspath(__file__).replace('util.py', 'xrandr') + + # TODO: Should prefer to use a library instead of xrandr. + # Alternatively, xrandr has already been included in recent versions + # of sniper so we can do away with building it all together. This would + # decrease build times, latency, and the size of Proton delta patches + xrandr_bin = os.path.abspath(__file__).replace('util.py', 'xrandr') # noqa: PTH100 xrandr_output = subprocess.check_output([xrandr_bin, '--current']).decode('utf-8') # Find the line that starts with 'Screen 0:' and extract the resolution @@ -770,7 +780,7 @@ def set_dxvk_option( conf = configparser.ConfigParser() conf.optionxform = str section = conf.default_section - dxvk_conf = os.path.join(os.environ['PWD'], 'dxvk.conf') + dxvk_conf = Path(os.environ['PWD'], 'dxvk.conf') conf.read(cfile) @@ -786,7 +796,7 @@ def set_dxvk_option( conf.set(section, 'session', str(os.getpid())) if os.access(dxvk_conf, os.F_OK): - with open(dxvk_conf, encoding='ascii') as dxvk: + with dxvk_conf.open(encoding='ascii') as dxvk: conf.read_file(read_dxvk_conf(dxvk)) log.debug(f'{conf.items(section)}') @@ -794,7 +804,7 @@ def set_dxvk_option( log.info('Addinging DXVK option: ' + str(opt) + ' = ' + str(val)) conf.set(section, opt, str(val)) - with open(cfile, 'w', encoding='ascii') as configfile: + with Path(cfile).open(encoding='ascii') as configfile: conf.write(configfile) @@ -808,32 +818,36 @@ def install_battleye_runtime() -> None: install_app('1161040') -def install_all_from_tgz(url: str, path: str = os.getcwd()) -> None: +# os.getcwd is OK here +def install_all_from_tgz(url: str, path: str = os.getcwd()) -> None: # noqa: PTH109 """Install all files from a downloaded tar.gz""" - cache_dir = os.path.expanduser('~/.cache/protonfixes') - os.makedirs(cache_dir, exist_ok=True) - tgz_file_name = os.path.basename(url) - tgz_file_path = os.path.join(cache_dir, tgz_file_name) + # TODO: Consider getting rid of this functionality or at least refactoring. + # For this to be more useful and valuable, be more generic by handling different + # archive/compression types (e.g., zip, zstd, tar) and download/extract more efficiently + cache_dir = Path('~/.cache/protonfixes').expanduser() + cache_dir.mkdir(parents=True, exist_ok=True) + tgz_file_name = Path(url).expanduser().name + tgz_file_path = cache_dir.joinpath(tgz_file_name) if tgz_file_name not in os.listdir(cache_dir): log.info('Downloading ' + tgz_file_name) - urllib.request.urlretrieve(url, tgz_file_path) + urllib.request.urlretrieve(url, str(tgz_file_path)) with tarfile.open(tgz_file_path, 'r:gz') as tgz_obj: log.info(f'Extracting {tgz_file_name} to {path}') tgz_obj.extractall(path) -def install_from_zip(url: str, filename: str, path: str = os.getcwd()) -> None: +def install_from_zip(url: str, filename: str, path: str = os.getcwd()) -> None: # noqa: PTH109 """Install a file from a downloaded zip""" if filename in os.listdir(path): log.info(f'File {filename} found in {path}') return - cache_dir = os.path.expanduser('~/.cache/protonfixes') - os.makedirs(cache_dir, exist_ok=True) - zip_file_name = os.path.basename(url) - zip_file_path = os.path.join(cache_dir, zip_file_name) + cache_dir = Path('~/.cache/protonfixes').expanduser() + cache_dir.mkdir(parents=True, exist_ok=True) + zip_file_name = Path(url).expanduser().name + zip_file_path = cache_dir.joinpath(zip_file_name) if zip_file_name not in os.listdir(cache_dir): log.info(f'Downloading {filename} to {zip_file_path}') @@ -868,7 +882,8 @@ def is_smt_enabled() -> bool: If the check has failed, False is returned. """ try: - with open('/sys/devices/system/cpu/smt/active', encoding='ascii') as smt_file: + # open is OK here + with open('/sys/devices/system/cpu/smt/active', encoding='ascii') as smt_file: # noqa: PTH123 return smt_file.read().strip() == '1' except PermissionError: log.warn('No permission to read SMT status') @@ -948,6 +963,7 @@ def set_cpu_topology_limit(core_limit: int, ignore_user_setting: bool = False) - # Apply the limit return set_cpu_topology(core_limit, ignore_user_setting) + def set_game_drive(enabled: bool) -> None: """Enable or disable the game drive setting. @@ -966,6 +982,6 @@ def set_game_drive(enabled: bool) -> None: """ if enabled: - protonmain.g_session.compat_config.add("gamedrive") + protonmain.g_session.compat_config.add('gamedrive') else: - protonmain.g_session.compat_config.discard("gamedrive") + protonmain.g_session.compat_config.discard('gamedrive')