Skip to content

Commit

Permalink
Async SSH (#113)
Browse files Browse the repository at this point in the history
* async ssh

* fix ssh

* try ha build

* cleaner script
  • Loading branch information
mariusandra authored Dec 31, 2024
1 parent f109ec0 commit 2969761
Show file tree
Hide file tree
Showing 11 changed files with 572 additions and 369 deletions.
29 changes: 29 additions & 0 deletions .github/workflows/docker-publish-multi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,32 @@ jobs:
tags: |
frameos/frameos:${{ env.BRANCH_NAME }}-${{ env.SHORT_SHA }}
frameos/frameos:latest
update-addon-repo:
name: Update Home Assistant Addon
needs: push-multiarch
runs-on: ubuntu-latest
steps:
- name: Checkout frameos-home-assistant-addon
uses: actions/checkout@v2
with:
repository: frameos/frameos-home-assistant-addon
token: ${{ secrets.ACTIONS_WRITE_TOKEN }}
ref: main
path: home-assistant-addon

- name: Update version in config.yml
run: |
cd home-assistant-addon/frameos
echo "Updating version in config.yml to main-${{ env.SHORT_SHA }}"
sed -i "s/^version: .*/version: main-${{ env.SHORT_SHA }}/" config.yml
- name: Commit changes
uses: EndBug/add-and-commit@v9
with:
message: "Update FrameOS version to main-${{ env.SHORT_SHA }}"
add: "."
push: true
author_name: FrameOS Bot
author_email: [email protected]

4 changes: 2 additions & 2 deletions backend/app/api/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ async def api_frame_get_assets(id: int, db: Session = Depends(get_db), redis: Re
command = f"find {assets_path} -type f -exec stat --format='%s %Y %n' {{}} +"
output: list[str] = []
await exec_command(db, redis, frame, ssh, command, output, log_output=False)
remove_ssh_connection(ssh)
await remove_ssh_connection(ssh)

assets = []
for line in output:
Expand Down Expand Up @@ -275,7 +275,7 @@ async def api_frame_get_asset(id: int, request: Request, db: Session = Depends(g
}
)
finally:
remove_ssh_connection(ssh)
await remove_ssh_connection(ssh)
except HTTPException:
raise
except Exception as e:
Expand Down
574 changes: 349 additions & 225 deletions backend/app/tasks/deploy_frame.py

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions backend/app/tasks/reset_frame.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
from typing import Any
from arq import ArqRedis
from sqlalchemy.orm import Session

from app.models.log import new_log as log
from app.models.frame import Frame, update_frame
from sqlalchemy.orm import Session
from arq import ArqRedis as Redis

async def reset_frame(id: int, redis: ArqRedis):
await redis.enqueue_job("reset_frame", id=id)

async def reset_frame_task(ctx: dict[str, Any], id: int):
db: Session = ctx['db']
redis: Redis = ctx['redis']
redis: ArqRedis = ctx['redis']

frame = db.get(Frame, id)
if frame and frame.status != 'uninitialized':
Expand Down
7 changes: 4 additions & 3 deletions backend/app/tasks/restart_frame.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from typing import Any
from sqlalchemy.orm import Session
from arq import ArqRedis as Redis

from app.models.log import new_log as log
from app.models.frame import Frame, update_frame
from app.utils.ssh_utils import get_ssh_connection, exec_command, remove_ssh_connection
from sqlalchemy.orm import Session
from arq import ArqRedis as Redis

async def restart_frame(id: int, redis: Redis):
await redis.enqueue_job("restart_frame", id=id)
Expand Down Expand Up @@ -41,4 +42,4 @@ async def restart_frame_task(ctx: dict[str, Any], id: int):
if ssh is not None:
ssh.close()
await log(db, redis, id, "stdinfo", "SSH connection closed")
remove_ssh_connection(ssh)
await remove_ssh_connection(ssh)
9 changes: 5 additions & 4 deletions backend/app/tasks/stop_frame.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
from typing import Any
from arq import ArqRedis
from sqlalchemy.orm import Session

from app.models.log import new_log as log
from app.models.frame import Frame, update_frame
from app.utils.ssh_utils import get_ssh_connection, exec_command, remove_ssh_connection
from sqlalchemy.orm import Session
from arq import ArqRedis as Redis

async def stop_frame(id: int, redis: ArqRedis):
await redis.enqueue_job("stop_frame", id=id)

async def stop_frame_task(ctx: dict[str, Any], id: int):
db: Session = ctx['db']
redis: Redis = ctx['redis']
redis: ArqRedis = ctx['redis']

ssh = None
frame = None
try:
frame = db.get(Frame, id)
if not frame:
Expand All @@ -38,4 +39,4 @@ async def stop_frame_task(ctx: dict[str, Any], id: int):
if ssh is not None:
ssh.close()
await log(db, redis, id, "stdinfo", "SSH connection closed")
remove_ssh_connection(ssh)
await remove_ssh_connection(ssh)
27 changes: 6 additions & 21 deletions backend/app/tasks/worker.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
"""
backend/app/tasks/worker.py
This file defines:
- The arq worker settings (how to run the worker).
- The tasks/coroutines that run via arq.
Defines the arq worker settings and the task functions that run via arq.
"""

from httpx import AsyncClient
Expand All @@ -23,33 +21,27 @@

# Optional: on_startup logic
async def startup(ctx: Dict[str, Any]):
"""
Example: if you want to open a single shared httpx session or DB session in the worker
"""
ctx['client'] = AsyncClient()
ctx['redis'] = create_redis_connection()
ctx['db'] = SessionLocal()
print("Worker startup: created shared HTTPX client")
print("Worker startup: created shared HTTPX client, Redis, and DB session")

# Optional: on_shutdown logic
async def shutdown(ctx: Dict[str, Any]):
"""
Example: close that shared session
"""
if 'client' in ctx:
await ctx['client'].aclose()
if 'redis' in ctx:
await ctx['redis'].close()
if 'db' in ctx:
ctx['db'].close()

print("Worker shutdown: closed shared HTTPX client")
print("Worker shutdown: closed resources")


class WorkerSettings:
"""
WorkerSettings is what `arq` uses to actually run the worker process.
You will run it with `arq app.tasks.WorkerSettings`.
WorkerSettings is what `arq` uses to run the worker process.
You run it with: `arq app.tasks.worker.WorkerSettings`.
"""
functions = [
func(deploy_frame_task, name="deploy_frame"),
Expand All @@ -60,14 +52,7 @@ class WorkerSettings:
on_startup = startup
on_shutdown = shutdown

# Connect to the same redis instance used in your app:
redis_settings = REDIS_SETTINGS

# Keep results for 1 hour (3600s) by default, or set any other retention
keep_result = 3600

# max concurrency:
keep_result = 3600 # Keep results for 1 hour
max_jobs = 10

# If you want to allow job abort (stop/cancel):
allow_abort_jobs = True
Loading

0 comments on commit 2969761

Please sign in to comment.