Skip to content

Commit

Permalink
Handle unicode characters carefully and resolve a dumb widows only bug
Browse files Browse the repository at this point in the history
  • Loading branch information
sabeechen committed Oct 28, 2023
1 parent ee31341 commit db807a2
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 28 deletions.
68 changes: 43 additions & 25 deletions decrypt-ha-backup/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,27 @@
import random
import getpass
from typing import IO
import securetar
import tempfile
import platform
from pathlib import Path
from hacked_secure_tar_file import HackedSecureTarFile

class FailureError(Exception):
"""Indicates a failure with a user readable message attached"""

def __init__(self, message: str) -> None:
"""Initialize failure error."""
super().__init__(message)
self.message = message

def __str__(self) -> str:
"""Return string representation of failure error."""
return self.message

#PATH = "EncryptedFolders.tar"
PATH = "EncryptedFolders.tar"
PASSWORD = "orcsorcs"

def password_to_key(password: str) -> bytes:
"""Generate a AES Key from password."""
key: bytes = password.encode()
key: bytes = password.encode("utf-8")
for _ in range(100):
key = hashlib.sha256(key).digest()
return key[:16]
Expand All @@ -31,15 +40,8 @@ def key_to_iv(key: bytes) -> bytes:
key = hashlib.sha256(key).digest()
return key[:16]

def _generate_iv(key: bytes, salt: bytes) -> bytes:
"""Generate an iv from data."""
temp_iv = key + salt
for _ in range(100):
temp_iv = hashlib.sha256(temp_iv).digest()
return temp_iv[:16]

def overwrite(line: str):
sys.stdout.write(f"\r{line}\033[K")
sys.stdout.write(f"\r{line.encode('utf-8', 'replace').decode()}\033[K")

def readTarMembers(tar: tarfile.TarFile):
while(True):
Expand All @@ -57,6 +59,8 @@ def __init__(self, tarfile: tarfile.TarFile):
except KeyError:
self._configMember = self._tarfile.getmember("./backup.json")
json_file = self._tarfile.extractfile(self._configMember)
if not json_file:
raise FailureError("Backup doesn't contain a metadata file named 'snapshot.json' or 'backup.json'")
self._config = json.loads(json_file.read())
json_file.close()
self._items = [BackupItem(entry['slug'], entry['name'], self) for entry in self._config.get("addons")]
Expand Down Expand Up @@ -118,6 +122,8 @@ def __init__(self, slug, name, backup: Backup):
self._name = name
self._backup = backup
self._info = self._backup._tarfile.getmember(self.fileName)
if not self._info:
raise FailureError(f"Backup file doesn't contain a file for {self._name} with the name '{self.fileName}'")

@property
def fileName(self):
Expand All @@ -141,7 +147,10 @@ def size(self):
return self.info.size

def _open(self):
return self._backup._tarfile.extractfile(self.info)
data = self._backup._tarfile.extractfile(self.info)
if not data:
raise FailureError(f"Backup file doesn't contain a file named {self.info.name}")
return data

def _extractTo(self, file: IO):
progress = 0
Expand All @@ -154,9 +163,7 @@ def _extractTo(self, file: IO):
file.write(data)
overwrite(f"Extracting '{self.name}' {round(100 * progress/self.size, 1)}%")
progress += len(data)
file.flush()
overwrite(f"Extracting '{self.name}' {round(100 * progress/self.size, 1)}%")
file.seek(0)
print()

def _copyTar(self, source: tarfile.TarFile, dest: tarfile.TarFile):
Expand All @@ -167,12 +174,13 @@ def _copyTar(self, source: tarfile.TarFile, dest: tarfile.TarFile):
else:
dest.addfile(member, source.extractfile(member))

def addTo(self, output: tarfile, key: bytes):
with tempfile.NamedTemporaryFile() as extracted:
self._extractTo(extracted)
overwrite(f"Decrypting '{self.name}'")
extracted.seek(0)
with securetar.SecureTarFile(Path(extracted.name), "r", key=key, gzip=self._backup.compressed) as decrypted:
def addTo(self, temp_folder: str, output: tarfile.TarFile, key: bytes):
temp_file = os.path.join(temp_folder, os.urandom(24).hex())
try:
with open(temp_file, "wb") as f:
self._extractTo(f)
overwrite(f"Decrypting '{self.name}'")
with HackedSecureTarFile(Path(temp_file), key=key, gzip=self._backup.compressed) as decrypted:
with tempfile.NamedTemporaryFile() as processed:
tarmode = "w|" + ("gz" if self._backup.compressed else "")
with tarfile.open(f"{self.slug}.tar", tarmode, fileobj=processed) as archivetar:
Expand All @@ -187,6 +195,10 @@ def addTo(self, output: tarfile, key: bytes):
output.addfile(info, processed)
overwrite(f"Saving '{self.name}' done")
print()
finally:
if os.path.isfile(temp_file):
os.remove(temp_file)
pass


def main():
Expand All @@ -209,12 +221,13 @@ def main():
resp = input(f"The output file '{args.output_file}' already exists, do you want to overwrite it [y/n]?")
if not resp.startswith("y"):
print("Aborted")
exit()
exit(1)

if args.password is None:
# ask fro a password
args.password = getpass.getpass("Backup Password:")

temp_dir = tempfile.gettempdir()
try:
with tarfile.open(Path(args.backup_file), "r:") as backup_file:
backup = Backup(backup_file)
Expand All @@ -226,10 +239,11 @@ def main():
return

_key = password_to_key(args.password)
print(_key)

with tarfile.open(args.output_file, "w:") as output:
for archive in backup.items:
archive.addTo(output, _key)
archive.addTo(temp_dir, output, _key)

# Add the modified backup config
backup.addModifiedConfig(output)
Expand All @@ -238,11 +252,15 @@ def main():
except tarfile.ReadError as e:
if "not a gzip file" in str(e):
print("The file could not be read as a gzip file. Please ensure your password is correct.")
else:
raise
except FailureError as e:
print(e)
exit(1)


if __name__ == '__main__':
if platform.system() == 'Windows':
from ctypes import windll
windll.kernel32.SetConsoleMode(windll.kernel32.GetStdHandle(-11), 7)

main()
107 changes: 107 additions & 0 deletions decrypt-ha-backup/hacked_secure_tar_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""Tarfile fileobject handler for encrypted files."""
import hashlib
import logging
from pathlib import Path
import tarfile
from typing import IO, Optional

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import (
Cipher,
CipherContext,
algorithms,
modes,
)

_LOGGER: logging.Logger = logging.getLogger(__name__)

DEFAULT_BUFSIZE = 10240

class HackedSecureTarFile:
"""Thsi is a hacked up verison of SecureTarFile that works around a bunch of windows specific issues the library has"""

def __init__(
self,
name: Path,
key: bytes,
gzip: bool = True,
bufsize: int = DEFAULT_BUFSIZE,
) -> None:
"""Initialize encryption handler."""
self._file: Optional[IO[bytes]] = None
self._name: Path = name
self._bufsize: int = bufsize

# Tarfile options
self._tar: Optional[tarfile.TarFile] = None
self._tar_mode: str = f"r|gz" if gzip else f"r|"

# Encryption/Description
self._aes: Optional[Cipher] = None
self._key = key

# Function helper
self._decrypt: Optional[CipherContext] = None
self._init = True

def __enter__(self) -> tarfile.TarFile:
try:
# Encrypted/Decryped Tarfile
self._file = open(self._name, "rb")

# Extract IV for CBC
cbc_rand = self._file.read(16)

# Create Cipher
self._aes = Cipher(
algorithms.AES(self._key),
modes.CBC(_generate_iv(self._key, cbc_rand)),
backend=default_backend(),
)
self._decrypt = self._aes.decryptor()
self._tar = tarfile.open(
fileobj=self, mode=self._tar_mode, dereference=False, bufsize=self._bufsize
)
return self._tar
except:
self.__exit__(None, None, None)
raise

def __exit__(self, exc_type, exc_value, traceback) -> None:
"""Close file."""
if self._tar:
self._tar.close()
self._tar = None
if self._file:
self._file.close()
self._file = None

def read(self, size: int = 0) -> bytes:
"""Read data."""
assert self._decrypt is not None
assert self._file is not None
data = self._decrypt.update(self._file.read(size))
return data

@property
def path(self) -> Path:
"""Return path object of tarfile."""
return self._name

@property
def size(self) -> float:
"""Return backup size."""
if not self._name.is_file():
return 0
return round(self._name.stat().st_size / 1_048_576, 2) # calc mbyte


def _generate_iv(key: bytes, salt: bytes) -> bytes:
"""Generate an iv from data."""
temp_iv = key + salt
for _ in range(100):
temp_iv = hashlib.sha256(temp_iv).digest()
return temp_iv[:16]



4 changes: 3 additions & 1 deletion scripts/publish.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#!/bin/sh

pip install -q --upgrade setupext-janitor twine build
python3 setup.py clean --dist --eggs
python3 setup.py clean
python3 -m build
keyring --disable
python3 -m twine upload dist/*
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
'Programming Language :: Python :: 3.9',
],
packages=find_packages(include=['decrypt-ha-backup']),
version='2022.7.14.4',
version='2023.10.28.1',
description='Decryption utility for Home Assistant backups',
long_description=long_description,
long_description_content_type="text/markdown",
install_requires=["securetar"],
install_requires=["cryptography"],
author="Stephen Beechen",
author_email="[email protected]",
python_requires=">=3.9",
Expand Down

0 comments on commit db807a2

Please sign in to comment.