From 2969761657a9df4b6c67af4d04d41b7383d0e0e6 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 31 Dec 2024 01:55:18 +0100 Subject: [PATCH] Async SSH (#113) * async ssh * fix ssh * try ha build * cleaner script --- .github/workflows/docker-publish-multi.yml | 29 ++ backend/app/api/frames.py | 4 +- backend/app/tasks/deploy_frame.py | 574 +++++++++++++-------- backend/app/tasks/reset_frame.py | 6 +- backend/app/tasks/restart_frame.py | 7 +- backend/app/tasks/stop_frame.py | 9 +- backend/app/tasks/worker.py | 27 +- backend/app/utils/ssh_utils.py | 255 +++++---- backend/requirements.in | 1 + backend/requirements.txt | 3 + gpt.sh | 26 +- 11 files changed, 572 insertions(+), 369 deletions(-) diff --git a/.github/workflows/docker-publish-multi.yml b/.github/workflows/docker-publish-multi.yml index b4bfd940..7b6a096f 100644 --- a/.github/workflows/docker-publish-multi.yml +++ b/.github/workflows/docker-publish-multi.yml @@ -148,3 +148,32 @@ jobs: tags: | frameos/frameos:${{ env.BRANCH_NAME }}-${{ env.SHORT_SHA }} frameos/frameos:latest + + update-addon-repo: + name: Update Home Assistant Addon + needs: push-multiarch + runs-on: ubuntu-latest + steps: + - name: Checkout frameos-home-assistant-addon + uses: actions/checkout@v2 + with: + repository: frameos/frameos-home-assistant-addon + token: ${{ secrets.ACTIONS_WRITE_TOKEN }} + ref: main + path: home-assistant-addon + + - name: Update version in config.yml + run: | + cd home-assistant-addon/frameos + echo "Updating version in config.yml to main-${{ env.SHORT_SHA }}" + sed -i "s/^version: .*/version: main-${{ env.SHORT_SHA }}/" config.yml + + - name: Commit changes + uses: EndBug/add-and-commit@v9 + with: + message: "Update FrameOS version to main-${{ env.SHORT_SHA }}" + add: "." + push: true + author_name: FrameOS Bot + author_email: git@frameos.net + \ No newline at end of file diff --git a/backend/app/api/frames.py b/backend/app/api/frames.py index 26edca23..70e0c935 100644 --- a/backend/app/api/frames.py +++ b/backend/app/api/frames.py @@ -203,7 +203,7 @@ async def api_frame_get_assets(id: int, db: Session = Depends(get_db), redis: Re command = f"find {assets_path} -type f -exec stat --format='%s %Y %n' {{}} +" output: list[str] = [] await exec_command(db, redis, frame, ssh, command, output, log_output=False) - remove_ssh_connection(ssh) + await remove_ssh_connection(ssh) assets = [] for line in output: @@ -275,7 +275,7 @@ async def api_frame_get_asset(id: int, request: Request, db: Session = Depends(g } ) finally: - remove_ssh_connection(ssh) + await remove_ssh_connection(ssh) except HTTPException: raise except Exception as e: diff --git a/backend/app/tasks/deploy_frame.py b/backend/app/tasks/deploy_frame.py index 224d5ba5..7a3f0cac 100644 --- a/backend/app/tasks/deploy_frame.py +++ b/backend/app/tasks/deploy_frame.py @@ -9,14 +9,12 @@ import tempfile from typing import Any -from arq import ArqRedis as Redis +import asyncssh from packaging import version - import platform -from io import StringIO - -from scp import SCPClient +from arq import ArqRedis as Redis +from sqlalchemy.orm import Session from app.codegen.drivers_nim import write_drivers_nim from app.codegen.scene_nim import write_scene_nim, write_scenes_nim @@ -26,11 +24,12 @@ from app.models.log import new_log as log from app.models.frame import Frame, update_frame, get_frame_json from app.utils.ssh_utils import get_ssh_connection, exec_command, remove_ssh_connection, exec_local_command -from sqlalchemy.orm import Session + async def deploy_frame(id: int, redis: Redis): await redis.enqueue_job("deploy_frame", id=id) + async def deploy_frame_task(ctx: dict[str, Any], id: int): db: Session = ctx['db'] redis: Redis = ctx['redis'] @@ -38,38 +37,39 @@ async def deploy_frame_task(ctx: dict[str, Any], id: int): ssh = None try: frame = db.get(Frame, id) - - if frame is None: + if not frame: raise Exception("Frame not found") - if frame.scenes is None or len(frame.scenes) == 0: - raise Exception("You must have at least one installed scene to deploy a frame.") + if not frame.scenes or len(frame.scenes) == 0: + raise Exception("You must have at least one installed scene to deploy.") if frame.status == 'deploying': - raise Exception("Already deploying, will not deploy again. Request again to force deploy.") + raise Exception("Already deploying. Request again to force redeploy.") frame.status = 'deploying' await update_frame(db, redis, frame) - # TODO: add the concept of builds into the backend (track each build in the database) - build_id = ''.join(random.choice(string.ascii_lowercase) for i in range(12)) + build_id = ''.join(random.choice(string.ascii_lowercase) for _ in range(12)) await log(db, redis, id, "stdout", f"Deploying frame {frame.name} with build id {build_id}") nim_path = find_nim_v2() ssh = await get_ssh_connection(db, redis, frame) - async def install_if_necessary(package: str, raise_on_error = True) -> int: - """If a package is not installed, install it.""" - return await exec_command(db, redis, frame, ssh, f"dpkg -l | grep -q \"^ii {package}\" || sudo apt-get install -y {package}", raise_on_error=raise_on_error) + async def install_if_necessary(pkg: str, raise_on_error=True) -> int: + return await exec_command( + db, redis, frame, ssh, + f"dpkg -l | grep -q \"^ii {pkg}\" || sudo apt-get install -y {pkg}", + raise_on_error=raise_on_error + ) with tempfile.TemporaryDirectory() as temp_dir: await log(db, redis, id, "stdout", "- Getting target architecture") uname_output: list[str] = [] await exec_command(db, redis, frame, ssh, "uname -m", uname_output) arch = "".join(uname_output).strip() - if arch == "aarch64" or arch == "arm64": + if arch in ("aarch64", "arm64"): cpu = "arm64" - elif arch == "armv6l" or arch == "armv7l": + elif arch in ("armv6l", "armv7l"): cpu = "arm" elif arch == "i386": cpu = "i386" @@ -80,99 +80,192 @@ async def install_if_necessary(package: str, raise_on_error = True) -> int: try: mem_output: list[str] = [] await exec_command(db, redis, frame, ssh, "free -m", mem_output) - total_memory = int(mem_output[1].split()[1]) + total_memory = int(mem_output[1].split()[1]) # line 1 => "Mem: ... 991 ..." except Exception as e: await log(db, redis, id, "stderr", str(e)) low_memory = total_memory < 512 drivers = drivers_for_device(frame.device) - # create a build .tar.gz + # 1. Create build tar.gz locally await log(db, redis, id, "stdout", "- Copying build folders") build_dir, source_dir = create_build_folders(temp_dir, build_id) await log(db, redis, id, "stdout", "- Applying local modifications") await make_local_modifications(db, redis, frame, source_dir) await log(db, redis, id, "stdout", "- Creating build archive") - archive_path = await create_local_build_archive(db, redis, frame, build_dir, build_id, nim_path, source_dir, temp_dir, cpu) + archive_path = await create_local_build_archive( + db, redis, frame, build_dir, build_id, nim_path, source_dir, temp_dir, cpu + ) if low_memory: - await log(db, redis, id, "stdout", "- Low memory detected, stopping FrameOS for compilation") + await log(db, redis, id, "stdout", "- Low memory device, stopping FrameOS for compile") await exec_command(db, redis, frame, ssh, "sudo service frameos stop", raise_on_error=False) - with SCPClient(ssh.get_transport()) as scp: - # build the release on the server - await install_if_necessary("ntp") - await install_if_necessary("build-essential") - if drivers.get("evdev"): - await install_if_necessary("libevdev-dev") - if drivers.get('waveshare') or drivers.get('gpioButton'): - if await exec_command(db, redis, frame, ssh, '[[ -f "/usr/local/include/lgpio.h" || -f "/usr/include/lgpio.h" ]] && exit 0 || exit 1', raise_on_error=False) != 0: - if await install_if_necessary("liblgpio-dev", raise_on_error=False) != 0: - await log(db, redis, id, "stdout", "--> Could not find liblgpio-dev package, installing from source") - command = "if [ ! -f /usr/local/include/lgpio.h ]; then "\ - "rm -rf /tmp/lgpio-install && "\ - "mkdir -p /tmp/lgpio-install && "\ - "cd /tmp/lgpio-install && "\ - "wget -q -O v0.2.2.tar.gz https://github.com/joan2937/lg/archive/refs/tags/v0.2.2.tar.gz && "\ - "tar -xzf v0.2.2.tar.gz && "\ - "cd lg-0.2.2 && "\ - "make && "\ - "sudo make install && "\ - "sudo rm -rf /tmp/lgpio-install; "\ - "fi" - await exec_command(db, redis, frame, ssh, command) - - await exec_command(db, redis, frame, ssh, "if [ ! -d /srv/frameos/ ]; then sudo mkdir -p /srv/frameos/ && sudo chown $(whoami):$(whoami) /srv/frameos/; fi") - await exec_command(db, redis, frame, ssh, "mkdir -p /srv/frameos/build/ /srv/frameos/logs/") - await log(db, redis, id, "stdout", f"> add /srv/frameos/build/build_{build_id}.tar.gz") - scp.put(archive_path, f"/srv/frameos/build/build_{build_id}.tar.gz") - await exec_command(db, redis, frame, ssh, f"cd /srv/frameos/build && tar -xzf build_{build_id}.tar.gz && rm build_{build_id}.tar.gz") - await exec_command(db, redis, frame, ssh, f"cd /srv/frameos/build/build_{build_id} && PARALLEL_MEM=$(awk '/MemTotal/{{printf \"%.0f\\n\", $2/1024/250}}' /proc/meminfo) && PARALLEL=$(($PARALLEL_MEM < $(nproc) ? $PARALLEL_MEM : $(nproc))) && make -j$PARALLEL") - await exec_command(db, redis, frame, ssh, f"mkdir -p /srv/frameos/releases/release_{build_id}") - await exec_command(db, redis, frame, ssh, f"cp /srv/frameos/build/build_{build_id}/frameos /srv/frameos/releases/release_{build_id}/frameos") - await log(db, redis, id, "stdout", f"> add /srv/frameos/releases/release_{build_id}/frame.json") - scp.putfo(StringIO(json.dumps(get_frame_json(db, frame), indent=4) + "\n"), f"/srv/frameos/releases/release_{build_id}/frame.json") - - # TODO: abstract driver-specific install steps - # TODO: abstract vendor logic - if inkyPython := drivers.get("inkyPython"): - await exec_command(db, redis, frame, ssh, f"mkdir -p /srv/frameos/vendor && cp -r /srv/frameos/build/build_{build_id}/vendor/inkyPython /srv/frameos/vendor/") - await install_if_necessary("python3-pip") - await install_if_necessary("python3-venv") - await exec_command(db, redis, frame, ssh, f"cd /srv/frameos/vendor/{inkyPython.vendor_folder} && ([ ! -d env ] && python3 -m venv env || echo 'env exists') && (sha256sum -c requirements.txt.sha256sum 2>/dev/null || (echo '> env/bin/pip3 install -r requirements.txt' && env/bin/pip3 install -r requirements.txt && sha256sum requirements.txt > requirements.txt.sha256sum))") - - if inkyHyperPixel2r := drivers.get("inkyHyperPixel2r"): - await exec_command(db, redis, frame, ssh, f"mkdir -p /srv/frameos/vendor && cp -r /srv/frameos/build/build_{build_id}/vendor/inkyHyperPixel2r /srv/frameos/vendor/") - await install_if_necessary("python3-dev") - await install_if_necessary("python3-pip") - await install_if_necessary("python3-venv") - await exec_command(db, redis, frame, ssh, f"cd /srv/frameos/vendor/{inkyHyperPixel2r.vendor_folder} && ([ ! -d env ] && python3 -m venv env || echo 'env exists') && (sha256sum -c requirements.txt.sha256sum 2>/dev/null || (echo '> env/bin/pip3 install -r requirements.txt' && env/bin/pip3 install -r requirements.txt && sha256sum requirements.txt > requirements.txt.sha256sum))") - - # add frameos.service - with open("../frameos/frameos.service", "r") as file: - service_contents = file.read().replace("%I", frame.ssh_user) - with SCPClient(ssh.get_transport()) as scp: - scp.putfo(StringIO(service_contents), f"/srv/frameos/releases/release_{build_id}/frameos.service") - await exec_command(db, redis, frame, ssh, f"mkdir -p /srv/frameos/state && ln -s /srv/frameos/state /srv/frameos/releases/release_{build_id}/state") - await exec_command(db, redis, frame, ssh, f"sudo cp /srv/frameos/releases/release_{build_id}/frameos.service /etc/systemd/system/frameos.service") - await exec_command(db, redis, frame, ssh, "sudo chown root:root /etc/systemd/system/frameos.service") - await exec_command(db, redis, frame, ssh, "sudo chmod 644 /etc/systemd/system/frameos.service") - - # swap out the release - await exec_command(db, redis, frame, ssh, f"rm -rf /srv/frameos/current && ln -s /srv/frameos/releases/release_{build_id} /srv/frameos/current") - - # Make sure /srv/assets is writable, if not create it and assign to our user + # 2. Remote steps + await install_if_necessary("ntp") + await install_if_necessary("build-essential") + + if drivers.get("evdev"): + await install_if_necessary("libevdev-dev") + + if drivers.get("waveshare") or drivers.get("gpioButton"): + check_lgpio = await exec_command( + db, redis, frame, ssh, + '[[ -f "/usr/local/include/lgpio.h" || -f "/usr/include/lgpio.h" ]] && exit 0 || exit 1', + raise_on_error=False + ) + if check_lgpio != 0: + # Try installing liblgpio-dev + if await install_if_necessary("liblgpio-dev", raise_on_error=False) != 0: + await log(db, redis, id, "stdout", + "--> Could not find liblgpio-dev. Installing from source.") + command = ( + "if [ ! -f /usr/local/include/lgpio.h ]; then " + " rm -rf /tmp/lgpio-install && " + " mkdir -p /tmp/lgpio-install && " + " cd /tmp/lgpio-install && " + " wget -q -O v0.2.2.tar.gz https://github.com/joan2937/lg/archive/refs/tags/v0.2.2.tar.gz && " + " tar -xzf v0.2.2.tar.gz && " + " cd lg-0.2.2 && " + " make && " + " sudo make install && " + " sudo rm -rf /tmp/lgpio-install; " + "fi" + ) + await exec_command(db, redis, frame, ssh, command) + + # Ensure /srv/frameos + await exec_command(db, redis, frame, ssh, + "if [ ! -d /srv/frameos/ ]; then " + " sudo mkdir -p /srv/frameos/ && sudo chown $(whoami):$(whoami) /srv/frameos/; " + "fi") + + await exec_command(db, redis, frame, ssh, "mkdir -p /srv/frameos/build/ /srv/frameos/logs/") + await log(db, redis, id, "stdout", f"> add /srv/frameos/build/build_{build_id}.tar.gz") + + # 3. Upload the local tarball + await asyncssh.scp( + archive_path, + (ssh, f"/srv/frameos/build/build_{build_id}.tar.gz"), + recurse=False + ) + + # Unpack & compile on device + await exec_command(db, redis, frame, ssh, + f"cd /srv/frameos/build && tar -xzf build_{build_id}.tar.gz && rm build_{build_id}.tar.gz") + await exec_command(db, redis, frame, ssh, + f"cd /srv/frameos/build/build_{build_id} && " + "PARALLEL_MEM=$(awk '/MemTotal/{printf \"%.0f\\n\", $2/1024/250}' /proc/meminfo) && " + "PARALLEL=$(($PARALLEL_MEM < $(nproc) ? $PARALLEL_MEM : $(nproc))) && " + "make -j$PARALLEL") + + await exec_command(db, redis, frame, ssh, f"mkdir -p /srv/frameos/releases/release_{build_id}") + await exec_command(db, redis, frame, ssh, + f"cp /srv/frameos/build/build_{build_id}/frameos " + f"/srv/frameos/releases/release_{build_id}/frameos") + + # 4. Upload frame.json using a TEMP FILE approach + frame_json_data = (json.dumps(get_frame_json(db, frame), indent=4) + "\n").encode('utf-8') + with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as tmpf: + local_json_path = tmpf.name + tmpf.write(frame_json_data) + await asyncssh.scp( + local_json_path, (ssh, f"/srv/frameos/releases/release_{build_id}/frame.json"), + recurse=False + ) + os.remove(local_json_path) # remove local temp file + await log(db, redis, id, "stdout", f"> add /srv/frameos/releases/release_{build_id}/frame.json") + + # Driver-specific vendor steps + if inkyPython := drivers.get("inkyPython"): + await exec_command(db, redis, frame, ssh, + f"mkdir -p /srv/frameos/vendor && " + f"cp -r /srv/frameos/build/build_{build_id}/vendor/inkyPython /srv/frameos/vendor/") + await install_if_necessary("python3-pip") + await install_if_necessary("python3-venv") + await exec_command(db, redis, frame, ssh, + f"cd /srv/frameos/vendor/{inkyPython.vendor_folder} && " + "([ ! -d env ] && python3 -m venv env || echo 'env exists') && " + "(sha256sum -c requirements.txt.sha256sum 2>/dev/null || " + "(echo '> env/bin/pip3 install -r requirements.txt' && " + "env/bin/pip3 install -r requirements.txt && " + "sha256sum requirements.txt > requirements.txt.sha256sum))") + + if inkyHyperPixel2r := drivers.get("inkyHyperPixel2r"): + await exec_command(db, redis, frame, ssh, + f"mkdir -p /srv/frameos/vendor && " + f"cp -r /srv/frameos/build/build_{build_id}/vendor/inkyHyperPixel2r /srv/frameos/vendor/") + await install_if_necessary("python3-dev") + await install_if_necessary("python3-pip") + await install_if_necessary("python3-venv") + await exec_command(db, redis, frame, ssh, + f"cd /srv/frameos/vendor/{inkyHyperPixel2r.vendor_folder} && " + "([ ! -d env ] && python3 -m venv env || echo 'env exists') && " + "(sha256sum -c requirements.txt.sha256sum 2>/dev/null || " + "(echo '> env/bin/pip3 install -r requirements.txt' && " + "env/bin/pip3 install -r requirements.txt && " + "sha256sum requirements.txt > requirements.txt.sha256sum))") + + # 5. Upload frameos.service with a TEMP FILE approach + with open("../frameos/frameos.service", "r") as f: + service_contents = f.read().replace("%I", frame.ssh_user) + service_data = service_contents.encode('utf-8') + with tempfile.NamedTemporaryFile(suffix=".service", delete=False) as tmpservice: + local_service_path = tmpservice.name + tmpservice.write(service_data) + await asyncssh.scp( + local_service_path, + (ssh, f"/srv/frameos/releases/release_{build_id}/frameos.service"), + recurse=False + ) + os.remove(local_service_path) + + await exec_command(db, redis, frame, ssh, + f"mkdir -p /srv/frameos/state && ln -s /srv/frameos/state " + f"/srv/frameos/releases/release_{build_id}/state") + await exec_command(db, redis, frame, ssh, + f"sudo cp /srv/frameos/releases/release_{build_id}/frameos.service " + f"/etc/systemd/system/frameos.service") + await exec_command(db, redis, frame, ssh, "sudo chown root:root /etc/systemd/system/frameos.service") + await exec_command(db, redis, frame, ssh, "sudo chmod 644 /etc/systemd/system/frameos.service") + + # 6. Link new release + await exec_command(db, redis, frame, ssh, + f"rm -rf /srv/frameos/current && " + f"ln -s /srv/frameos/releases/release_{build_id} /srv/frameos/current") + + # Ensure /srv/assets assets_path = frame.assets_path or "/srv/assets" - await exec_command(db, redis, frame, ssh, f"if [ ! -d {assets_path} ]; then sudo mkdir -p {assets_path} && sudo chown $(whoami):$(whoami) {assets_path}; elif [ ! -w {assets_path} ]; then echo 'User does not have write access to {assets_path}. Changing ownership...'; sudo chown $(whoami):$(whoami) {assets_path}; fi") - - # clean old build and release and cache folders - await exec_command(db, redis, frame, ssh, "cd /srv/frameos/build && ls -dt1 build_* | tail -n +11 | xargs rm -rf") - await exec_command(db, redis, frame, ssh, "cd /srv/frameos/build/cache && find . -type f \( -atime +0 -a -mtime +0 \) | xargs rm -rf") - await exec_command(db, redis, frame, ssh, "cd /srv/frameos/releases && ls -dt1 release_* | grep -v \"$(basename $(readlink ../current))\" | tail -n +11 | xargs rm -rf") - + await exec_command( + db, redis, frame, ssh, + f"if [ ! -d {assets_path} ]; then " + f" sudo mkdir -p {assets_path} && sudo chown $(whoami):$(whoami) {assets_path}; " + f"elif [ ! -w {assets_path} ]; then " + f" echo 'User lacks write access to {assets_path}. Fixing...'; " + f" sudo chown $(whoami):$(whoami) {assets_path}; " + f"fi" + ) + + # Clean old builds + await exec_command(db, redis, frame, ssh, + "cd /srv/frameos/build && ls -dt1 build_* | tail -n +11 | xargs rm -rf") + await exec_command(db, redis, frame, ssh, + "cd /srv/frameos/build/cache && find . -type f \\( -atime +0 -a -mtime +0 \\) | xargs rm -rf") + await exec_command(db, redis, frame, ssh, + "cd /srv/frameos/releases && " + "ls -dt1 release_* | grep -v \"$(basename $(readlink ../current))\" " + "| tail -n +11 | xargs rm -rf") + + # Additional device config if drivers.get("i2c"): - await exec_command(db, redis, frame, ssh, 'grep -q "^dtparam=i2c_vc=on$" /boot/config.txt || echo "dtparam=i2c_vc=on" | sudo tee -a /boot/config.txt') - await exec_command(db, redis, frame, ssh, 'command -v raspi-config > /dev/null && sudo raspi-config nonint get_i2c | grep -q "1" && { sudo raspi-config nonint do_i2c 0; echo "I2C is now enabled"; } || echo "I2C is already enabled"') + await exec_command(db, redis, frame, ssh, + 'grep -q "^dtparam=i2c_vc=on$" /boot/config.txt ' + '|| echo "dtparam=i2c_vc=on" | sudo tee -a /boot/config.txt') + await exec_command(db, redis, frame, ssh, + 'command -v raspi-config > /dev/null && ' + 'sudo raspi-config nonint get_i2c | grep -q "1" && { ' + ' sudo raspi-config nonint do_i2c 0; echo "I2C enabled"; ' + '} || echo "I2C already enabled"') if drivers.get("spi"): await exec_command(db, redis, frame, ssh, 'sudo raspi-config nonint do_spi 0') @@ -180,10 +273,12 @@ async def install_if_necessary(package: str, raise_on_error = True) -> int: await exec_command(db, redis, frame, ssh, 'sudo raspi-config nonint do_spi 1') if low_memory: - # disable apt-daily-upgrade (sudden +70mb memory usage, might lead a Zero W 2 to endlessly swap) - await exec_command(db, redis, frame, ssh, "sudo systemctl mask apt-daily-upgrade && sudo systemctl mask apt-daily && sudo systemctl disable apt-daily.service apt-daily.timer apt-daily-upgrade.timer apt-daily-upgrade.service") - # # disable swap while we're at it - # await exec_command(db, redis, frame, ssh, "sudo systemctl disable dphys-swapfile.service") + await exec_command( + db, redis, frame, ssh, + "sudo systemctl mask apt-daily-upgrade && " + "sudo systemctl mask apt-daily && " + "sudo systemctl disable apt-daily.service apt-daily.timer apt-daily-upgrade.timer apt-daily-upgrade.service" + ) if frame.reboot and frame.reboot.get('enabled') == 'true': cron_schedule = frame.reboot.get('crontab', '0 0 * * *') @@ -191,11 +286,12 @@ async def install_if_necessary(package: str, raise_on_error = True) -> int: crontab = f"{cron_schedule} root /sbin/shutdown -r now" else: crontab = f"{cron_schedule} root systemctl restart frameos.service" - await exec_command(db, redis, frame, ssh, f"echo '{crontab}' | sudo tee /etc/cron.d/frameos-reboot") + await exec_command(db, redis, frame, ssh, + f"echo '{crontab}' | sudo tee /etc/cron.d/frameos-reboot") else: await exec_command(db, redis, frame, ssh, "sudo rm -f /etc/cron.d/frameos-reboot") - # restart + # restart frame await exec_command(db, redis, frame, ssh, "sudo systemctl daemon-reload") await exec_command(db, redis, frame, ssh, "sudo systemctl enable frameos.service") await exec_command(db, redis, frame, ssh, "sudo systemctl restart frameos.service") @@ -206,15 +302,15 @@ async def install_if_necessary(package: str, raise_on_error = True) -> int: except Exception as e: await log(db, redis, id, "stderr", str(e)) - if frame is not None: + if frame: frame.status = 'uninitialized' await update_frame(db, redis, frame) finally: if ssh is not None: ssh.close() - if frame is not None: + if frame: await log(db, redis, int(frame.id), "stdinfo", "SSH connection closed") - remove_ssh_connection(ssh) + await remove_ssh_connection(ssh) def find_nim_v2(): @@ -223,183 +319,217 @@ def find_nim_v2(): raise Exception("Nim executable not found") nim_version = get_nim_version(nim_path) if not nim_version or nim_version < version.parse("2.0.0"): - raise Exception("Nim version 2.0.0 or higher is required") + raise Exception("Nim 2.0.0 or higher is required") return nim_path def create_build_folders(temp_dir, build_id): build_dir = os.path.join(temp_dir, f"build_{build_id}") source_dir = os.path.join(temp_dir, "frameos") - # 1. copy the frameos folder to the temp folder os.makedirs(source_dir, exist_ok=True) shutil.copytree("../frameos", source_dir, dirs_exist_ok=True) - # 2. make a new build folder with the build_id os.makedirs(build_dir, exist_ok=True) return build_dir, source_dir -async def make_local_modifications(db: Session, redis: Redis, frame: Frame, source_dir: str): +async def make_local_modifications(db: Session, redis: Redis, + frame: Frame, source_dir: str): shutil.rmtree(os.path.join(source_dir, "src", "scenes"), ignore_errors=True) os.makedirs(os.path.join(source_dir, "src", "scenes"), exist_ok=True) - # write all source apps for node_id, sources in get_apps_from_scenes(list(frame.scenes)).items(): app_id = "nodeapp_" + node_id.replace('-', '_') app_dir = os.path.join(source_dir, "src", "apps", app_id) os.makedirs(app_dir, exist_ok=True) - for file, source in sources.items(): - with open(os.path.join(app_dir, file), "w") as file: - file.write(source) + for filename, code in sources.items(): + with open(os.path.join(app_dir, filename), "w") as f: + f.write(code) - # only one scene called "default" for now for scene in frame.scenes: try: scene_source = write_scene_nim(frame, scene) - id = re.sub(r'\W+', '', scene.get('id', 'default')) - with open(os.path.join(source_dir, "src", "scenes", f"scene_{id}.nim"), "w") as file: - file.write(scene_source) + safe_id = re.sub(r'\W+', '', scene.get('id', 'default')) + with open(os.path.join(source_dir, "src", "scenes", f"scene_{safe_id}.nim"), "w") as f: + f.write(scene_source) except Exception as e: - await log(db, redis, int(frame.id), "stderr", f"Error writing scene \"{scene.get('name', '')}\" ({scene.get('id', 'default')}): {e}") + await log(db, redis, int(frame.id), "stderr", + f"Error writing scene \"{scene.get('name','')}\" " + f"({scene.get('id','default')}): {e}") raise - with open(os.path.join(source_dir, "src", "scenes", "scenes.nim"), "w") as file: - file.write(write_scenes_nim(frame)) + + with open(os.path.join(source_dir, "src", "scenes", "scenes.nim"), "w") as f: + f.write(write_scenes_nim(frame)) drivers = drivers_for_device(frame.device) - with open(os.path.join(source_dir, "src", "drivers", "drivers.nim"), "w") as file: - drivers_nim = write_drivers_nim(drivers) - file.write(drivers_nim) + with open(os.path.join(source_dir, "src", "drivers", "drivers.nim"), "w") as f: + f.write(write_drivers_nim(drivers)) + if drivers.get("waveshare"): - with open(os.path.join(source_dir, "src", "drivers", "waveshare", "driver.nim"), "w") as file: - drivers_nim = write_waveshare_driver_nim(drivers) - file.write(drivers_nim) + with open(os.path.join(source_dir, "src", "drivers", "waveshare", "driver.nim"), "w") as wf: + wf.write(write_waveshare_driver_nim(drivers)) + -def compile_line_md5(input: str) -> str: +def compile_line_md5(input_str: str) -> str: words = [] ignore_next = False - # The -I paths contain temporary folders, making this non-deterministic. So we remove them. - for word in input.split(' '): + for word in input_str.split(' '): if word == '-I': ignore_next = True elif ignore_next or word.startswith("-I"): pass else: words.append(word) - encoded_string = " ".join(words).encode() - hash_object = hashlib.md5(encoded_string) - md5_hash = hash_object.hexdigest() - return md5_hash - -async def create_local_build_archive(db: Session, redis: Redis, frame: Frame, build_dir: str, build_id: str, nim_path: str, source_dir: str, temp_dir: str, cpu: str): - # TODO: abstract driver-specific vendor steps + return hashlib.md5(" ".join(words).encode()).hexdigest() + + +async def create_local_build_archive( + db: Session, + redis: Redis, + frame: Frame, + build_dir: str, + build_id: str, + nim_path: str, + source_dir: str, + temp_dir: str, + cpu: str +): drivers = drivers_for_device(frame.device) if inkyPython := drivers.get('inkyPython'): vendor_folder = inkyPython.vendor_folder or "" os.makedirs(os.path.join(build_dir, "vendor"), exist_ok=True) - shutil.copytree(f"../frameos/vendor/{vendor_folder}/", os.path.join(build_dir, "vendor", vendor_folder), dirs_exist_ok=True) + shutil.copytree( + f"../frameos/vendor/{vendor_folder}/", + os.path.join(build_dir, "vendor", vendor_folder), + dirs_exist_ok=True + ) shutil.rmtree(os.path.join(build_dir, "vendor", vendor_folder, "env"), ignore_errors=True) shutil.rmtree(os.path.join(build_dir, "vendor", vendor_folder, "__pycache__"), ignore_errors=True) + if inkyHyperPixel2r := drivers.get('inkyHyperPixel2r'): vendor_folder = inkyHyperPixel2r.vendor_folder or "" os.makedirs(os.path.join(build_dir, "vendor"), exist_ok=True) - shutil.copytree(f"../frameos/vendor/{vendor_folder}/", os.path.join(build_dir, "vendor", vendor_folder), dirs_exist_ok=True) + shutil.copytree( + f"../frameos/vendor/{vendor_folder}/", + os.path.join(build_dir, "vendor", vendor_folder), + dirs_exist_ok=True + ) shutil.rmtree(os.path.join(build_dir, "vendor", vendor_folder, "env"), ignore_errors=True) shutil.rmtree(os.path.join(build_dir, "vendor", vendor_folder, "__pycache__"), ignore_errors=True) - # Tell a white lie - await log(db, redis, int(frame.id), "stdout", "- No cross compilation. Generating source code for compilation on frame.") - - # run "nim c --os:linux --cpu:arm64 --compileOnly --genScript --nimcache:tmp/build_1 src/frameos.nim" - debug_options = "" - if frame.debug: - debug_options = "--lineTrace:on" - status, out, err = await exec_local_command( - db, - redis, - frame, - f"cd {source_dir} && nimble assets -y && nimble setup && {nim_path} compile --os:linux --cpu:{cpu} --compileOnly --genScript --nimcache:{build_dir} {debug_options} src/frameos.nim 2>&1" + await log(db, redis, int(frame.id), "stdout", + "- No cross compilation. Generating source code for compilation on frame.") + + debug_options = "--lineTrace:on" if frame.debug else "" + cmd = ( + f"cd {source_dir} && nimble assets -y && nimble setup && " + f"{nim_path} compile --os:linux --cpu:{cpu} " + f"--compileOnly --genScript --nimcache:{build_dir} " + f"{debug_options} src/frameos.nim 2>&1" ) + + status, out, err = await exec_local_command(db, redis, frame, cmd) if status != 0: - last_line = [line for line in (out or "").split("\n") if line != ''][-1] - if match := re.match(r'^(.*\.nim)\((\d+), (\d+)\),?.*', last_line): - filename = match.group(1) - line_nr = int(match.group(2)) - column = int(match.group(3)) - source_path = os.path.realpath(source_dir) - final_path = os.path.realpath(os.path.join(source_dir, filename)) - if os.path.commonprefix([final_path, source_path]) == source_path: - filename = final_path[len(source_path) + 1:] - with open(final_path, "r") as open_file: - lines = open_file.readlines() - await log(db, redis, int(frame.id), "stdout", f"Error in {filename}:{line_nr}:{column}") - await log(db, redis, int(frame.id), "stdout", f"Line {line_nr}: {lines[line_nr - 1]}") - await log(db, redis, int(frame.id), "stdout", f".......{'.' * (column - 1 + len(str(line_nr)))}^") - else: - await log(db, redis, int(frame.id), "stdout", f"Error in {filename}:{line_nr}:{column}") + lines = (out or "").split("\n") + filtered = [ln for ln in lines if ln.strip()] + if filtered: + last_line = filtered[-1] + match = re.match(r'^(.*\.nim)\((\d+), (\d+)\),?.*', last_line) + if match: + fn = match.group(1) + line_nr = int(match.group(2)) + column = int(match.group(3)) + source_abs = os.path.realpath(source_dir) + final_path = os.path.realpath(os.path.join(source_dir, fn)) + if os.path.commonprefix([final_path, source_abs]) == source_abs: + rel_fn = final_path[len(source_abs) + 1:] + with open(final_path, "r") as of: + all_lines = of.readlines() + await log(db, redis, int(frame.id), "stdout", + f"Error in {rel_fn}:{line_nr}:{column}") + await log(db, redis, int(frame.id), "stdout", + f"Line {line_nr}: {all_lines[line_nr - 1]}") + await log(db, redis, int(frame.id), "stdout", + f".......{'.'*(column - 1 + len(str(line_nr)))}^") + else: + await log(db, redis, int(frame.id), "stdout", + f"Error in {fn}:{line_nr}:{column}") raise Exception("Failed to generate frameos sources") - # Copy the file "nimbase.h" to "build_1/nimbase.h" nimbase_path = find_nimbase_file(nim_path) if not nimbase_path: raise Exception("nimbase.h not found") + shutil.copy(nimbase_path, os.path.join(build_dir, "nimbase.h")) if waveshare := drivers.get('waveshare'): if waveshare.variant: variant_folder = get_variant_folder(waveshare.variant) util_files = ["Debug.h", "DEV_Config.c", "DEV_Config.h"] - for file in util_files: - from_path = os.path.join(source_dir, "src", "drivers", "waveshare", variant_folder, file) - to_path = os.path.join(build_dir, file) - shutil.copy(from_path, to_path) - - if waveshare.variant in ["EPD_2in9b", "EPD_2in9c", "EPD_2in13b", "EPD_2in13c", "EPD_4in2b", "EPD_4in2c", "EPD_5in83b", "EPD_5in83c", "EPD_7in5b", "EPD_7in5c"]: + for uf in util_files: + shutil.copy( + os.path.join(source_dir, "src", "drivers", "waveshare", variant_folder, uf), + os.path.join(build_dir, uf) + ) + + # color e-paper variants + if waveshare.variant in [ + "EPD_2in9b", "EPD_2in9c", "EPD_2in13b", "EPD_2in13c", + "EPD_4in2b", "EPD_4in2c", "EPD_5in83b", "EPD_5in83c", + "EPD_7in5b", "EPD_7in5c" + ]: c_file = re.sub(r'[bc]', 'bc', waveshare.variant) variant_files = [f"{waveshare.variant}.nim", f"{c_file}.c", f"{c_file}.h"] else: variant_files = [f"{waveshare.variant}.nim", f"{waveshare.variant}.c", f"{waveshare.variant}.h"] - for file in variant_files: - shutil.copy(os.path.join(source_dir, "src", "drivers", "waveshare", variant_folder, file), os.path.join(build_dir, file)) + for vf in variant_files: + shutil.copy( + os.path.join(source_dir, "src", "drivers", "waveshare", variant_folder, vf), + os.path.join(build_dir, vf) + ) - # Create Makefile - with open(os.path.join(build_dir, "Makefile"), "w") as makefile_file: - # Read the compilation flags from the generated script + with open(os.path.join(build_dir, "Makefile"), "w") as mk: script_path = os.path.join(build_dir, "compile_frameos.sh") linker_flags = ["-pthread", "-lm", "-lrt", "-ldl"] compiler_flags: list[str] = [] - with open(script_path, "r") as script: - lines = script.readlines() - for line in lines: + with open(script_path, "r") as sc: + lines_sc = sc.readlines() + for line in lines_sc: if " -o frameos " in line and " -l" in line: - linker_flags = [flag.strip() for flag in line.split(' ') if flag.startswith('-') and flag != '-o'] - elif " -c " in line and len(compiler_flags) == 0: - compiler_flags = [flag for flag in line.split(' ') if flag.startswith('-') and not flag.startswith('-I') and flag not in ['-o', '-c', '-D']] - - # Read the Makefile from ../frameos/tools/nimc.Makefile - with open(os.path.join(source_dir, "tools", "nimc.Makefile"), "r") as makefile: - lines = makefile.readlines() - for line in lines: - if line.startswith("LIBS = "): - line = "LIBS = -L. " + (" ".join(linker_flags)) + "\n" - if line.startswith("CFLAGS = "): - line = "CFLAGS = " + (" ".join([f for f in compiler_flags if f != '-c'])) + "\n" - makefile_file.write(line) - - # 7. Zip it up "(cd tmp && tar -czf ./build_1.tar.gz build_1)" + linker_flags = [ + fl.strip() for fl in line.split(' ') + if fl.startswith('-') and fl != '-o' + ] + elif " -c " in line and not compiler_flags: + compiler_flags = [ + fl for fl in line.split(' ') + if fl.startswith('-') and not fl.startswith('-I') + and fl not in ['-o', '-c', '-D'] + ] + + with open(os.path.join(source_dir, "tools", "nimc.Makefile"), "r") as mf_in: + lines_make = mf_in.readlines() + for ln in lines_make: + if ln.startswith("LIBS = "): + ln = "LIBS = -L. " + " ".join(linker_flags) + "\n" + if ln.startswith("CFLAGS = "): + ln = "CFLAGS = " + " ".join([f for f in compiler_flags if f != '-c']) + "\n" + mk.write(ln) + archive_path = os.path.join(temp_dir, f"build_{build_id}.tar.gz") - zip_base_path = os.path.join(temp_dir, f"build_{build_id}") - shutil.make_archive(zip_base_path, 'gztar', temp_dir, f"build_{build_id}") + zip_base = os.path.join(temp_dir, f"build_{build_id}") + shutil.make_archive(zip_base, 'gztar', temp_dir, f"build_{build_id}") return archive_path + def find_nim_executable(): - # Common paths for nim executable based on the operating system common_paths = { 'Windows': [ 'C:\\Program Files\\Nim\\bin\\nim.exe', 'C:\\Nim\\bin\\nim.exe' ], - 'Darwin': [ # macOS + 'Darwin': [ '/opt/homebrew/bin/nim', '/usr/local/bin/nim' ], @@ -410,19 +540,17 @@ def find_nim_executable(): ] } - # Check if nim is in the PATH if is_executable_in_path('nim'): - return 'nim' # nim is in the PATH + return 'nim' - # If not in PATH, check common paths based on the OS os_type = platform.system() for path in common_paths.get(os_type, []): if os.path.isfile(path) and os.access(path, os.X_OK): return path + return None - return None # nim not found -def is_executable_in_path(executable): +def is_executable_in_path(executable: str): try: subprocess.run([executable, '--version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) return True @@ -430,35 +558,33 @@ def is_executable_in_path(executable): return False -def get_nim_version(executable_path): +def get_nim_version(executable_path: str): try: - result = subprocess.run([executable_path, '--version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, + result = subprocess.run([executable_path, '--version'], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - # Example output: "Nim Compiler Version 1.4.8 [Linux: amd64]" output = result.stdout.split('\n')[0] version_str = output.split()[3] return version.parse(version_str) except Exception as e: - print(f"An error occurred while getting Nim version: {e}") + print(f"Error getting Nim version: {e}") return None def find_nimbase_file(nim_executable: str): nimbase_paths: list[str] = [] - # Try to get paths from 'nim dump' try: - nim_dump_output = subprocess.run([nim_executable, "dump"], text=True, - stdout=subprocess.DEVNULL, - stderr=subprocess.PIPE).stderr - # Extract paths that might contain 'nimbase.h' + nim_dump_output = subprocess.run( + [nim_executable, "dump"], text=True, + stdout=subprocess.DEVNULL, stderr=subprocess.PIPE + ).stderr nimbase_paths.extend(line for line in nim_dump_output.splitlines() if 'lib' in line) except subprocess.CalledProcessError as e: print(f"Error running 'nim dump': {e}") - # Add common paths based on the operating system os_type = platform.system() - if os_type == 'Darwin': # macOS + if os_type == 'Darwin': nimbase_paths.append('/usr/local/lib/nim') elif os_type == 'Linux': nimbase_paths.append('/usr/lib/nim') @@ -466,18 +592,16 @@ def find_nimbase_file(nim_executable: str): elif os_type == 'Windows': nimbase_paths.append('C:\\Nim\\lib') - # Search for nimbase.h in the collected paths for path in nimbase_paths: - nimbase_file = os.path.join(path, 'nimbase.h') - if os.path.isfile(nimbase_file): - return nimbase_file + nb_file = os.path.join(path, 'nimbase.h') + if os.path.isfile(nb_file): + return nb_file - if os_type == 'Darwin': # macOS + if os_type == 'Darwin': base_dir = '/opt/homebrew/Cellar/nim/' if os.path.exists(base_dir): - for version_dir in os.listdir(base_dir): - nimbase_path = os.path.join(base_dir, version_dir, 'nim', 'lib', 'nimbase.h') - if os.path.isfile(nimbase_path): - return nimbase_path - - return None # nimbase.h not found + for verdir in os.listdir(base_dir): + nb_file = os.path.join(base_dir, verdir, 'nim', 'lib', 'nimbase.h') + if os.path.isfile(nb_file): + return nb_file + return None diff --git a/backend/app/tasks/reset_frame.py b/backend/app/tasks/reset_frame.py index 99c0e9c3..cf62b186 100644 --- a/backend/app/tasks/reset_frame.py +++ b/backend/app/tasks/reset_frame.py @@ -1,16 +1,16 @@ from typing import Any from arq import ArqRedis +from sqlalchemy.orm import Session + from app.models.log import new_log as log from app.models.frame import Frame, update_frame -from sqlalchemy.orm import Session -from arq import ArqRedis as Redis async def reset_frame(id: int, redis: ArqRedis): await redis.enqueue_job("reset_frame", id=id) async def reset_frame_task(ctx: dict[str, Any], id: int): db: Session = ctx['db'] - redis: Redis = ctx['redis'] + redis: ArqRedis = ctx['redis'] frame = db.get(Frame, id) if frame and frame.status != 'uninitialized': diff --git a/backend/app/tasks/restart_frame.py b/backend/app/tasks/restart_frame.py index f125d6a0..3f8e7791 100644 --- a/backend/app/tasks/restart_frame.py +++ b/backend/app/tasks/restart_frame.py @@ -1,9 +1,10 @@ from typing import Any +from sqlalchemy.orm import Session +from arq import ArqRedis as Redis + from app.models.log import new_log as log from app.models.frame import Frame, update_frame from app.utils.ssh_utils import get_ssh_connection, exec_command, remove_ssh_connection -from sqlalchemy.orm import Session -from arq import ArqRedis as Redis async def restart_frame(id: int, redis: Redis): await redis.enqueue_job("restart_frame", id=id) @@ -41,4 +42,4 @@ async def restart_frame_task(ctx: dict[str, Any], id: int): if ssh is not None: ssh.close() await log(db, redis, id, "stdinfo", "SSH connection closed") - remove_ssh_connection(ssh) + await remove_ssh_connection(ssh) diff --git a/backend/app/tasks/stop_frame.py b/backend/app/tasks/stop_frame.py index fc453cd1..9c3a142c 100644 --- a/backend/app/tasks/stop_frame.py +++ b/backend/app/tasks/stop_frame.py @@ -1,19 +1,20 @@ from typing import Any from arq import ArqRedis +from sqlalchemy.orm import Session + from app.models.log import new_log as log from app.models.frame import Frame, update_frame from app.utils.ssh_utils import get_ssh_connection, exec_command, remove_ssh_connection -from sqlalchemy.orm import Session -from arq import ArqRedis as Redis async def stop_frame(id: int, redis: ArqRedis): await redis.enqueue_job("stop_frame", id=id) async def stop_frame_task(ctx: dict[str, Any], id: int): db: Session = ctx['db'] - redis: Redis = ctx['redis'] + redis: ArqRedis = ctx['redis'] ssh = None + frame = None try: frame = db.get(Frame, id) if not frame: @@ -38,4 +39,4 @@ async def stop_frame_task(ctx: dict[str, Any], id: int): if ssh is not None: ssh.close() await log(db, redis, id, "stdinfo", "SSH connection closed") - remove_ssh_connection(ssh) + await remove_ssh_connection(ssh) diff --git a/backend/app/tasks/worker.py b/backend/app/tasks/worker.py index 6d2bf3b8..d353c691 100644 --- a/backend/app/tasks/worker.py +++ b/backend/app/tasks/worker.py @@ -1,9 +1,7 @@ """ backend/app/tasks/worker.py -This file defines: -- The arq worker settings (how to run the worker). -- The tasks/coroutines that run via arq. +Defines the arq worker settings and the task functions that run via arq. """ from httpx import AsyncClient @@ -23,19 +21,13 @@ # Optional: on_startup logic async def startup(ctx: Dict[str, Any]): - """ - Example: if you want to open a single shared httpx session or DB session in the worker - """ ctx['client'] = AsyncClient() ctx['redis'] = create_redis_connection() ctx['db'] = SessionLocal() - print("Worker startup: created shared HTTPX client") + print("Worker startup: created shared HTTPX client, Redis, and DB session") # Optional: on_shutdown logic async def shutdown(ctx: Dict[str, Any]): - """ - Example: close that shared session - """ if 'client' in ctx: await ctx['client'].aclose() if 'redis' in ctx: @@ -43,13 +35,13 @@ async def shutdown(ctx: Dict[str, Any]): if 'db' in ctx: ctx['db'].close() - print("Worker shutdown: closed shared HTTPX client") + print("Worker shutdown: closed resources") class WorkerSettings: """ - WorkerSettings is what `arq` uses to actually run the worker process. - You will run it with `arq app.tasks.WorkerSettings`. + WorkerSettings is what `arq` uses to run the worker process. + You run it with: `arq app.tasks.worker.WorkerSettings`. """ functions = [ func(deploy_frame_task, name="deploy_frame"), @@ -60,14 +52,7 @@ class WorkerSettings: on_startup = startup on_shutdown = shutdown - # Connect to the same redis instance used in your app: redis_settings = REDIS_SETTINGS - - # Keep results for 1 hour (3600s) by default, or set any other retention - keep_result = 3600 - - # max concurrency: + keep_result = 3600 # Keep results for 1 hour max_jobs = 10 - - # If you want to allow job abort (stop/cancel): allow_abort_jobs = True diff --git a/backend/app/utils/ssh_utils.py b/backend/app/utils/ssh_utils.py index 9a085a52..09a74311 100644 --- a/backend/app/utils/ssh_utils.py +++ b/backend/app/utils/ssh_utils.py @@ -1,112 +1,173 @@ -from asyncio import sleep -import atexit -import signal import subprocess -from io import StringIO -from typing import Optional -from cryptography.hazmat.primitives.serialization import load_pem_private_key -from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.backends import default_backend - -from paramiko import RSAKey, SSHClient, AutoAddPolicy - +from arq import ArqRedis +import asyncssh +import asyncio +from typing import Optional, List +from sqlalchemy.orm import Session +from app.models.log import new_log as log from app.models.frame import Frame from app.models.settings import Settings -from app.models.log import new_log as log -from sqlalchemy.orm import Session -from arq import ArqRedis as Redis - -ssh_connections: set[SSHClient] = set() -def close_ssh_connections(): - for ssh in ssh_connections: +async def remove_ssh_connection(ssh): + """ + Close the asyncssh connection. + """ + if ssh: + ssh.close() + # Wait for the connection to be fully closed try: - ssh.close() - print("SSH connection closed.") - except: # noqa: E722 + await ssh.wait_closed() + except asyncio.CancelledError: pass -atexit.register(close_ssh_connections) - -def handle_signal(signum, frame): - close_ssh_connections() - exit(1) - -signal.signal(signal.SIGTERM, handle_signal) -signal.signal(signal.SIGINT, handle_signal) - -def remove_ssh_connection(ssh: SSHClient): - ssh_connections.remove(ssh) - -async def get_ssh_connection(db: Session, redis: Redis, frame: Frame) -> SSHClient: - ssh_type = '(password)' if frame.ssh_pass else '(keypair)' - await log(db, redis, frame.id, "stdinfo", f"Connecting via SSH to {frame.ssh_user}@{frame.frame_host} {ssh_type}") - ssh = SSHClient() - ssh_connections.add(ssh) - ssh.set_missing_host_key_policy(AutoAddPolicy()) - - if frame.ssh_pass: - ssh.connect(frame.frame_host, username=frame.ssh_user, password=frame.ssh_pass, timeout=30) - else: - ssh_keys = db.query(Settings).filter_by(key="ssh_keys").first() - default_key: Optional[str] = None - if ssh_keys and ssh_keys.value: - default_key = ssh_keys.value.get("default", None) - if default_key: - try: - private_key_cryptography = load_pem_private_key( - default_key.encode(), - password=None, - backend=default_backend() +async def get_ssh_connection(db, redis, frame): + """ + Create and return an asyncssh connection object to the frame. + """ + host = frame.frame_host + port = frame.ssh_port or 22 + username = frame.ssh_user + password = frame.ssh_pass + + await log(db, redis, frame.id, "stdinfo", + f"Connecting via SSH to {username}@{host} " + f"({'password' if password else 'keypair'})") + + # 1) If password is set, just do password-based auth + # 2) Otherwise, load the private key from DB + client_keys = [] + if not password: + # Attempt to load SSH keys from DB + ssh_keys_row = db.query(Settings).filter_by(key="ssh_keys").first() + if ssh_keys_row and ssh_keys_row.value: + default_key = ssh_keys_row.value.get("default", None) + if default_key: + # Convert string -> asyncssh private key object + try: + # asyncssh can parse the key directly: + private_key_obj = asyncssh.import_private_key(default_key) + except (asyncssh.KeyImportError, TypeError): + # If that fails, see if there's any other fallback + raise Exception("Could not parse the private key from DB. " + "Check that it’s in valid PEM format.") + + client_keys = [private_key_obj] + else: + raise Exception("Key-based auth chosen but no default key found in DB.") + else: + raise Exception("No password set and no SSH keys found in settings. " + "Either set a password or store a default key under 'ssh_keys'.") + + try: + ssh = await asyncssh.connect( + host=host, + port=port, + username=username, + password=password if password else None, + client_keys=client_keys if not password else None, + known_hosts=None # disable known_hosts checking (or provide a file) + ) + await log(db, redis, frame.id, "stdinfo", + f"SSH connection established to {username}@{host}") + return ssh + except (OSError, asyncssh.Error) as exc: + raise Exception(f"Unable to connect to {host}:{port} via SSH: {exc}") + + +async def exec_command(db, redis, frame, ssh, command: str, + output: Optional[List[str]] = None, + log_output: bool = True, + raise_on_error: bool = True) -> int: + """ + Execute a command on the remote host using an existing SSH connection. + Stream stdout and stderr lines as they arrive, optionally storing them + into 'output' and logging them in the database. + Returns the process exit status. + """ + + await log(db, redis, frame.id, "stdout", f"> {command}") + + # We will capture output in these buffers if needed + stdout_buffer = [] + stderr_buffer = [] + + try: + # Start the remote process + process = await ssh.create_process(command) + + # Create tasks to read stdout and stderr lines in parallel + stdout_task = asyncio.create_task( + _stream_lines( + db, redis, frame, process.stdout, "stdout", + log_output, stdout_buffer if output is not None else None + ) + ) + stderr_task = asyncio.create_task( + _stream_lines( + db, redis, frame, process.stderr, "stderr", + log_output, stderr_buffer if output is not None else None + ) + ) + + # Wait for both streaming tasks to complete + await asyncio.gather(stdout_task, stderr_task) + + # Wait for the process to exit + respoonse = await process.wait() + exit_status = respoonse.exit_status + + # If the caller wants the entire stdout combined, put it into output + # (We only store stdout in `output`, but you can also append stderr if desired.) + if output is not None: + stdout_data = "".join(stdout_buffer) + output.append(stdout_data) + + # Handle non-zero exit + if exit_status != 0: + # Grab final aggregated output for the exception details + stderr_data = "".join(stderr_buffer).strip() + stdout_data = "".join(stdout_buffer).strip() + + if raise_on_error: + raise Exception( + f"Command '{command}' failed with code {exit_status}\n" + f"stderr: {stderr_data}\n" + f"stdout: {stdout_data}" ) - pem = private_key_cryptography.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.NoEncryption() + else: + await log( + db, redis, frame.id, "exit_status", + f"The command exited with status {exit_status}" ) - ssh_key_obj = RSAKey(file_obj=StringIO(pem.decode())) - except: # noqa: E722 - ssh_key_obj = RSAKey.from_private_key(StringIO(default_key)) - ssh.connect(frame.frame_host, username=frame.ssh_user, pkey=ssh_key_obj, timeout=30) - else: - raise Exception("Set up SSH keys in the settings page, or provide a password for the frame") - await log(db, redis, int(frame.id), "stdinfo", f"Connected via SSH to {frame.ssh_user}@{frame.frame_host}") - return ssh - - -async def exec_command(db: Session, redis: Redis, frame: Frame, ssh: SSHClient, command: str, output: Optional[list[str]] = None, raise_on_error = True, log_output = True) -> int: - await log(db, redis, int(frame.id), "stdout", f"> {command}") - _stdin, stdout, stderr = ssh.exec_command(command) - exit_status = None - while exit_status is None: - while line := stdout.readline(): - if log_output: - await log(db, redis, int(frame.id), "stdout", line) - if output is not None: - output.append(line) - while line := stderr.readline(): - if log_output: - await log(db, redis, int(frame.id), "stderr", line) - if output is not None: - output.append(line) - - # Check if the command has finished running - if stdout.channel.exit_status_ready(): - exit_status = stdout.channel.recv_exit_status() - - # Sleep to prevent busy-waiting - await sleep(0.1) - if exit_status != 0: - if raise_on_error: - raise Exception(f"Command exited with status {exit_status}") - else: - await log(db, redis, int(frame.id), "exit_status", f"The command exited with status {exit_status}") + return exit_status + + except asyncssh.ProcessError as e: + # If the remote command cannot even be started + raise Exception(f"Error running command '{command}': {e}") from e + + +async def _stream_lines(db, redis, frame, stream, log_type: str, + log_output: bool, buffer_list: Optional[List[str]]): + """ + Helper coroutine that reads lines from `stream` (stdout or stderr) + and writes them to the DB log and/or appends them to buffer_list for + later use, as each line arrives. + """ + while True: + line = await stream.readline() + if not line: # no more data + break + + if buffer_list is not None: + buffer_list.append(line) - return exit_status + if log_output: + # Optionally strip the trailing newline for cleaner logs + await log(db, redis, frame.id, log_type, line.rstrip('\n')) -async def exec_local_command(db: Session, redis: Redis, frame: Frame, command: str, generate_log = True) -> tuple[int, Optional[str], Optional[str]]: +async def exec_local_command(db: Session, redis: ArqRedis, frame: Frame, command: str, generate_log = True) -> tuple[int, Optional[str], Optional[str]]: if generate_log: await log(db, redis, int(frame.id), "stdout", f"$ {command}") process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) @@ -127,7 +188,7 @@ async def exec_local_command(db: Session, redis: Redis, frame: Frame, command: s break if process.poll() is not None: break_next = True - await sleep(0.1) + await asyncio.sleep(0.1) exit_status = process.returncode diff --git a/backend/requirements.in b/backend/requirements.in index d664eda5..0df1288a 100644 --- a/backend/requirements.in +++ b/backend/requirements.in @@ -1,5 +1,6 @@ alembic arq +asyncssh dacite email_validator fastapi[standard] diff --git a/backend/requirements.txt b/backend/requirements.txt index d900625c..141d7112 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -13,6 +13,7 @@ appnope==0.1.4 arq==0.26.1 asttokens==2.4.0 # via stack-data +asyncssh==2.19.0 backcall==0.2.0 # via ipython bcrypt==4.0.1 @@ -47,6 +48,7 @@ click==8.1.7 # uvicorn cryptography==41.0.3 # via + # asyncssh # jwt # paramiko # types-paramiko @@ -254,6 +256,7 @@ typing-extensions==4.12.2 # via # alembic # anyio + # asyncssh # fastapi # mypy # pydantic diff --git a/gpt.sh b/gpt.sh index 199ae40f..fe6cdfec 100755 --- a/gpt.sh +++ b/gpt.sh @@ -5,29 +5,27 @@ OUTPUT="gpt.txt" # Start fresh > "$OUTPUT" -echo "I have this FastAPI backend app with a React frontend. Help me..." >> "$OUTPUT" - echo "" >> "$OUTPUT" # Define the patterns you want to collect files from: patterns=( - "docker-entrypoint.sh" - "Dockerfile" + # "docker-entrypoint.sh" + # "Dockerfile" "backend/app/*.py" - # "backend/app/schemas/*.py" + "backend/app/schemas/*.py" "backend/app/api/*.py" - "backend/app/api/tests/*.py" + # "backend/app/api/tests/*.py" # "backend/app/api/tests/test_frames.py" # "backend/app/api/tests/test_settings.py" - "backend/app/models/*.py" - # "backend/app/tasks/*.py" + # "backend/app/models/*.py" + "backend/app/tasks/*.py" # "backend/app/models/tests/*.py" - "frontend/src/urls.ts" - "frontend/src/main.tsx" - "frontend/src/types.tsx" - "frontend/src/scenes/App.tsx" - "frontend/src/scenes/scenes.tsx" - "frontend/src/scenes/sceneLogic.tsx" + # "frontend/src/urls.ts" + # "frontend/src/main.tsx" + # "frontend/src/types.tsx" + # "frontend/src/scenes/App.tsx" + # "frontend/src/scenes/scenes.tsx" + # "frontend/src/scenes/sceneLogic.tsx" # "frameos/src/apps/*/*/config.json" )