Skip to content

Commit

Permalink
Merge pull request #185 from polywrap/dev
Browse files Browse the repository at this point in the history
Release Dev Feb 18
  • Loading branch information
cbrzn authored Feb 18, 2024
2 parents fe52c16 + 140f648 commit 17c2d71
Show file tree
Hide file tree
Showing 8 changed files with 1,936 additions and 1,628 deletions.
1 change: 1 addition & 0 deletions .env.template
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
RUNTIME=local
WORKERS_URL=
NEXTAUTH_URL=
NEXTAUTH_SECRET=
Expand Down
13 changes: 9 additions & 4 deletions workers/fund_public_goods/api/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
from fund_public_goods.lib.strategy.utils.initialize_logs import initialize_logs
from fund_public_goods.db import tables, entities, app_db
from supabase.lib.client_options import ClientOptions
from fastapi import APIRouter, HTTPException, Header, BackgroundTasks
from fastapi import APIRouter, HTTPException, Header
from fastapi_events.dispatcher import dispatch
from pydantic import BaseModel
from typing import Optional
import os
Expand All @@ -19,7 +20,7 @@ class Response(BaseModel):
run_id: str

@router.post("/api/runs")
async def runs(background_tasks: BackgroundTasks, params: Params, authorization: Optional[str] = Header(None)) -> Response:
async def runs(params: Params, authorization: Optional[str] = Header(None)) -> Response:
if authorization:
supabase_auth_token = authorization.split(" ")[1]
else:
Expand All @@ -30,11 +31,15 @@ async def runs(background_tasks: BackgroundTasks, params: Params, authorization:
raise HTTPException(status_code=400, detail="Prompt cannot be empty.")
db = app_db.create(options=ClientOptions())
db.postgrest.auth(supabase_auth_token)

run_id = tables.runs.insert(entities.Runs(
prompt=prompt
), db)
initialize_logs(run_id)
background_tasks.add_task(create, run_id, authorization)

dispatch(
"create-strategy",
payload={"run_id": run_id, "authorization": authorization}
)

return Response(run_id=run_id)
63 changes: 63 additions & 0 deletions workers/fund_public_goods/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from dataclasses import dataclass
from typing import Any, Union
from aws_lambda_typing.events import SQSEvent
from fastapi_events.typing import Event as LocalEvent
from fastapi_events.handlers.local import local_handler, BaseEventHandler
from fastapi_events.handlers.aws import SQSForwardHandler
from fastapi_events.middleware import EventHandlerASGIMiddleware
from fund_public_goods.lib.strategy.create import create
import os
import json
from fastapi import FastAPI

@dataclass
class EventData:
name: str
payload: dict[str, Any]

def handler(event: EventData):
if event.name == "create-strategy":
run_id = event.payload["run_id"]
authorization = event.payload["authorization"]
create(run_id, authorization)
else:
raise Exception("Unknown event name!")

@local_handler.register(event_name="*")
def local_handle(local_event: LocalEvent):
event_name, payload = local_event
event = EventData(name=str(event_name), payload=payload)
handler(event)

def sqs_handler(sqs_event: SQSEvent):
events: list[EventData] = []

for record in sqs_event['Records']:
message_body: str = record['body']
message: dict[str, Any] = json.loads(message_body)
events.append(EventData(
name=message["name"],
payload=message["payload"]
))

for event in events:
handler(event)

def add_event_middleware(app: FastAPI):
event_handlers: list[BaseEventHandler] = []
env = os.getenv("RUNTIME")

if env == "cloud":
event_handlers = [
SQSForwardHandler(
queue_url="https://sqs.us-east-1.amazonaws.com/815880329304/fpg",
region_name="us-east-1"
)
]
else:
event_handlers = [local_handler]

app.add_middleware(
EventHandlerASGIMiddleware,
handlers=event_handlers
)
4 changes: 1 addition & 3 deletions workers/fund_public_goods/lib/strategy/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@
from fund_public_goods.lib.strategy.utils.fetch_matching_projects import fetch_matching_projects
from fund_public_goods.lib.strategy.utils.summarize_descriptions import summarize_descriptions
from supabase.lib.client_options import ClientOptions
from fastapi import APIRouter, Header, HTTPException
from fastapi import Header, HTTPException
from pydantic import BaseModel
from typing import Optional, cast
from langchain_community.callbacks import get_openai_callback


router = APIRouter()

class Params(BaseModel):
run_id: str

Expand Down
3 changes: 3 additions & 0 deletions workers/fund_public_goods/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from fastapi import FastAPI
import inngest.fast_api
from mangum import Mangum
from fund_public_goods.events import add_event_middleware
from fund_public_goods.inngest_client import inngest_client
from fund_public_goods.workflows.index_gitcoin.functions import functions as index_gitcoin_functions
from fund_public_goods.workflows.egress_gitcoin.functions import functions as egress_gitcoin_functions
Expand All @@ -34,4 +35,6 @@
app.include_router(funding_entries.router)
app.include_router(get_version_router)

add_event_middleware(app)

handler = Mangum(app=app)
Loading

0 comments on commit 17c2d71

Please sign in to comment.