diff --git a/decrypt-ha-backup/__main__.py b/decrypt-ha-backup/__main__.py index 5191838..7ed4110 100644 --- a/decrypt-ha-backup/__main__.py +++ b/decrypt-ha-backup/__main__.py @@ -9,18 +9,27 @@ import random import getpass from typing import IO -import securetar import tempfile import platform from pathlib import Path +from .hacked_secure_tar_file import HackedSecureTarFile + +class FailureError(Exception): + """Indicates a failure with a user readable message attached""" + + def __init__(self, message: str) -> None: + """Initialize failure error.""" + super().__init__(message) + self.message = message + + def __str__(self) -> str: + """Return string representation of failure error.""" + return self.message -#PATH = "EncryptedFolders.tar" -PATH = "EncryptedFolders.tar" -PASSWORD = "orcsorcs" def password_to_key(password: str) -> bytes: """Generate a AES Key from password.""" - key: bytes = password.encode() + key: bytes = password.encode("utf-8") for _ in range(100): key = hashlib.sha256(key).digest() return key[:16] @@ -31,15 +40,8 @@ def key_to_iv(key: bytes) -> bytes: key = hashlib.sha256(key).digest() return key[:16] -def _generate_iv(key: bytes, salt: bytes) -> bytes: - """Generate an iv from data.""" - temp_iv = key + salt - for _ in range(100): - temp_iv = hashlib.sha256(temp_iv).digest() - return temp_iv[:16] - def overwrite(line: str): - sys.stdout.write(f"\r{line}\033[K") + sys.stdout.write(f"\r{line.encode('utf-8', 'replace').decode()}\033[K") def readTarMembers(tar: tarfile.TarFile): while(True): @@ -57,6 +59,8 @@ def __init__(self, tarfile: tarfile.TarFile): except KeyError: self._configMember = self._tarfile.getmember("./backup.json") json_file = self._tarfile.extractfile(self._configMember) + if not json_file: + raise FailureError("Backup doesn't contain a metadata file named 'snapshot.json' or 'backup.json'") self._config = json.loads(json_file.read()) json_file.close() self._items = [BackupItem(entry['slug'], entry['name'], self) for entry in self._config.get("addons")] @@ -118,6 +122,8 @@ def __init__(self, slug, name, backup: Backup): self._name = name self._backup = backup self._info = self._backup._tarfile.getmember(self.fileName) + if not self._info: + raise FailureError(f"Backup file doesn't contain a file for {self._name} with the name '{self.fileName}'") @property def fileName(self): @@ -141,7 +147,10 @@ def size(self): return self.info.size def _open(self): - return self._backup._tarfile.extractfile(self.info) + data = self._backup._tarfile.extractfile(self.info) + if not data: + raise FailureError(f"Backup file doesn't contain a file named {self.info.name}") + return data def _extractTo(self, file: IO): progress = 0 @@ -154,9 +163,7 @@ def _extractTo(self, file: IO): file.write(data) overwrite(f"Extracting '{self.name}' {round(100 * progress/self.size, 1)}%") progress += len(data) - file.flush() overwrite(f"Extracting '{self.name}' {round(100 * progress/self.size, 1)}%") - file.seek(0) print() def _copyTar(self, source: tarfile.TarFile, dest: tarfile.TarFile): @@ -167,12 +174,13 @@ def _copyTar(self, source: tarfile.TarFile, dest: tarfile.TarFile): else: dest.addfile(member, source.extractfile(member)) - def addTo(self, output: tarfile, key: bytes): - with tempfile.NamedTemporaryFile() as extracted: - self._extractTo(extracted) - overwrite(f"Decrypting '{self.name}'") - extracted.seek(0) - with securetar.SecureTarFile(Path(extracted.name), "r", key=key, gzip=self._backup.compressed) as decrypted: + def addTo(self, temp_folder: str, output: tarfile.TarFile, key: bytes): + temp_file = os.path.join(temp_folder, os.urandom(24).hex()) + try: + with open(temp_file, "wb") as f: + self._extractTo(f) + overwrite(f"Decrypting '{self.name}'") + with HackedSecureTarFile(Path(temp_file), key=key, gzip=self._backup.compressed) as decrypted: with tempfile.NamedTemporaryFile() as processed: tarmode = "w|" + ("gz" if self._backup.compressed else "") with tarfile.open(f"{self.slug}.tar", tarmode, fileobj=processed) as archivetar: @@ -187,6 +195,10 @@ def addTo(self, output: tarfile, key: bytes): output.addfile(info, processed) overwrite(f"Saving '{self.name}' done") print() + finally: + if os.path.isfile(temp_file): + os.remove(temp_file) + pass def main(): @@ -209,12 +221,13 @@ def main(): resp = input(f"The output file '{args.output_file}' already exists, do you want to overwrite it [y/n]?") if not resp.startswith("y"): print("Aborted") - exit() + exit(1) if args.password is None: # ask fro a password args.password = getpass.getpass("Backup Password:") + temp_dir = tempfile.gettempdir() try: with tarfile.open(Path(args.backup_file), "r:") as backup_file: backup = Backup(backup_file) @@ -226,10 +239,11 @@ def main(): return _key = password_to_key(args.password) + print(_key) with tarfile.open(args.output_file, "w:") as output: for archive in backup.items: - archive.addTo(output, _key) + archive.addTo(temp_dir, output, _key) # Add the modified backup config backup.addModifiedConfig(output) @@ -238,11 +252,15 @@ def main(): except tarfile.ReadError as e: if "not a gzip file" in str(e): print("The file could not be read as a gzip file. Please ensure your password is correct.") + else: + raise + except FailureError as e: + print(e) + exit(1) if __name__ == '__main__': if platform.system() == 'Windows': from ctypes import windll windll.kernel32.SetConsoleMode(windll.kernel32.GetStdHandle(-11), 7) - main() \ No newline at end of file diff --git a/decrypt-ha-backup/hacked_secure_tar_file.py b/decrypt-ha-backup/hacked_secure_tar_file.py new file mode 100644 index 0000000..a121288 --- /dev/null +++ b/decrypt-ha-backup/hacked_secure_tar_file.py @@ -0,0 +1,107 @@ +"""Tarfile fileobject handler for encrypted files.""" +import hashlib +import logging +from pathlib import Path +import tarfile +from typing import IO, Optional + +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers import ( + Cipher, + CipherContext, + algorithms, + modes, +) + +_LOGGER: logging.Logger = logging.getLogger(__name__) + +DEFAULT_BUFSIZE = 10240 + +class HackedSecureTarFile: + """Thsi is a hacked up verison of SecureTarFile that works around a bunch of windows specific issues the library has""" + + def __init__( + self, + name: Path, + key: bytes, + gzip: bool = True, + bufsize: int = DEFAULT_BUFSIZE, + ) -> None: + """Initialize encryption handler.""" + self._file: Optional[IO[bytes]] = None + self._name: Path = name + self._bufsize: int = bufsize + + # Tarfile options + self._tar: Optional[tarfile.TarFile] = None + self._tar_mode: str = f"r|gz" if gzip else f"r|" + + # Encryption/Description + self._aes: Optional[Cipher] = None + self._key = key + + # Function helper + self._decrypt: Optional[CipherContext] = None + self._init = True + + def __enter__(self) -> tarfile.TarFile: + try: + # Encrypted/Decryped Tarfile + self._file = open(self._name, "rb") + + # Extract IV for CBC + cbc_rand = self._file.read(16) + + # Create Cipher + self._aes = Cipher( + algorithms.AES(self._key), + modes.CBC(_generate_iv(self._key, cbc_rand)), + backend=default_backend(), + ) + self._decrypt = self._aes.decryptor() + self._tar = tarfile.open( + fileobj=self, mode=self._tar_mode, dereference=False, bufsize=self._bufsize + ) + return self._tar + except: + self.__exit__(None, None, None) + raise + + def __exit__(self, exc_type, exc_value, traceback) -> None: + """Close file.""" + if self._tar: + self._tar.close() + self._tar = None + if self._file: + self._file.close() + self._file = None + + def read(self, size: int = 0) -> bytes: + """Read data.""" + assert self._decrypt is not None + assert self._file is not None + data = self._decrypt.update(self._file.read(size)) + return data + + @property + def path(self) -> Path: + """Return path object of tarfile.""" + return self._name + + @property + def size(self) -> float: + """Return backup size.""" + if not self._name.is_file(): + return 0 + return round(self._name.stat().st_size / 1_048_576, 2) # calc mbyte + + +def _generate_iv(key: bytes, salt: bytes) -> bytes: + """Generate an iv from data.""" + temp_iv = key + salt + for _ in range(100): + temp_iv = hashlib.sha256(temp_iv).digest() + return temp_iv[:16] + + + diff --git a/scripts/publish.sh b/scripts/publish.sh index ac68b5b..fab9c40 100644 --- a/scripts/publish.sh +++ b/scripts/publish.sh @@ -1,5 +1,7 @@ +#!/bin/sh + pip install -q --upgrade setupext-janitor twine build -python3 setup.py clean --dist --eggs +python3 setup.py clean python3 -m build keyring --disable python3 -m twine upload dist/* \ No newline at end of file diff --git a/setup.py b/setup.py index 6c920d4..75412f9 100644 --- a/setup.py +++ b/setup.py @@ -10,12 +10,12 @@ 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3.9', ], - packages=find_packages(include=['decrypt-ha-backup']), - version='2022.7.14.4', + packages=find_packages(), + version='2023.10.28.1', description='Decryption utility for Home Assistant backups', long_description=long_description, long_description_content_type="text/markdown", - install_requires=["securetar"], + install_requires=["cryptography"], author="Stephen Beechen", author_email="stephen@beechens.com", python_requires=">=3.9",