Skip to content

Commit

Permalink
websockets
Browse files Browse the repository at this point in the history
  • Loading branch information
mariusandra committed Dec 20, 2024
1 parent fb670ba commit ee907e9
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 21 deletions.
4 changes: 3 additions & 1 deletion backend/app/fastapi.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import os
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, JSONResponse
Expand All @@ -11,12 +12,13 @@
from alembic.config import Config as AlembicConfig
from alembic import command as alembic_command

from app.services.ws_broadcast import register_ws_routes
from app.services.ws_broadcast import register_ws_routes, redis_listener
from app.config import get_config
from backend.app.utils.sentry import initialize_sentry

def lifespan(app: FastAPI):
initialize_sentry()
asyncio.create_task(redis_listener())
yield

app = FastAPI(lifespan=lifespan)
Expand Down
39 changes: 19 additions & 20 deletions backend/app/services/ws_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,15 @@

from app.config import get_config

redis = None
redis_pub = None
redis_sub = None

async def init_redis():
global redis
if redis is None:
redis = create_redis(get_config().REDIS_URL, decode_responses=True)

async def publish_message(event: str, data: dict):
if not redis:
await init_redis()
msg = {"event": event, "data": data}
await redis.publish("broadcast_channel", json.dumps(msg))

global redis_pub, redis_sub
if redis_pub is None:
redis_pub = create_redis(get_config().REDIS_URL, decode_responses=True)
if redis_sub is None:
redis_sub = create_redis(get_config().REDIS_URL, decode_responses=True)

class ConnectionManager:
def __init__(self):
Expand Down Expand Up @@ -50,18 +46,25 @@ async def broadcast(self, message: str):
print(f"Error sending message to {connection.client}: {e}")
await self.disconnect(connection)

manager = ConnectionManager()
manager = ConnectionManager() # Local clients

async def redis_listener():
await init_redis()
pubsub = redis.pubsub()
if not redis_sub:
await init_redis()
pubsub = redis_sub.pubsub()
await pubsub.subscribe("broadcast_channel")

async for message in pubsub.listen():
if message["type"] == "message":
msg = message["data"]
await manager.broadcast(msg)
await manager.broadcast(message["data"])

async def publish_message(event: str, data: dict):
if not redis_pub:
await init_redis()
msg = {"event": event, "data": data}
await redis_pub.publish("broadcast_channel", json.dumps(msg))
# TODO: broadcast to local clients directly, to the rest via redis
# await manager.broadcast(json.dumps(msg))

def register_ws_routes(app):
@app.websocket("/ws")
Expand All @@ -78,8 +81,4 @@ async def websocket_endpoint(websocket: WebSocket):
print(f"Error: {e}")
await manager.disconnect(websocket)

@app.on_event("startup")
async def startup_event():
asyncio.create_task(redis_listener())

return manager

0 comments on commit ee907e9

Please sign in to comment.